obmp-docker/exabgp/inject.py

390 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
inject.py CLI wrapper for the ExaBGP Route Injection API
Usage:
inject.py status
inject.py peers
inject.py routes
inject.py scenarios
inject.py announce <prefix> [<prefix>...] [--as-path ASN...] [--community STR...] [--med N] [--next-hop IP]
inject.py withdraw <prefix> [<prefix>...]
inject.py withdraw-all
inject.py scenario <name>
inject.py withdraw-scenario <name>
inject.py full-table [--count N] [--follow] # inject full IPv4 table (background)
inject.py full-table-status # show injection progress
inject.py full-table-stop # stop injection
inject.py churn [--count N] [--interval SEC] # cycle announce/withdraw for ip_rib_log population
inject.py monitor # live-refresh terminal view
Environment:
EXABGP_API=http://localhost:5050 API base URL
"""
import sys
import os
import json
import time
import argparse
import requests
API = os.environ.get('EXABGP_API', 'http://localhost:5050')
def _post(path, data=None, timeout=10):
r = requests.post(f'{API}{path}', json=data or {}, timeout=timeout)
r.raise_for_status()
return r.json()
def _delete(path):
r = requests.delete(f'{API}{path}', timeout=10)
r.raise_for_status()
return r.json()
def _get(path):
r = requests.get(f'{API}{path}', timeout=10)
r.raise_for_status()
return r.json()
def _pp(data):
print(json.dumps(data, indent=2))
def cmd_status(args):
_pp(_get('/healthz'))
def cmd_peers(args):
data = _get('/peers')
peers = data.get('peers', {})
if not peers:
print("No peer state received yet (ExaBGP may still be establishing sessions).")
return
print(f"{'Peer':<20} {'State':<8} {'Updated'}")
print('-' * 55)
for ip, info in peers.items():
state = info.get('state', 'unknown')
updated = info.get('updated', '-')
indicator = 'UP' if state == 'up' else 'DOWN'
print(f"{ip:<20} {indicator:<8} {updated}")
def cmd_monitor(args):
"""Live-refreshing terminal status view. Ctrl+C to exit."""
import shutil
print("OpenBMP ExaBGP Monitor (Ctrl+C to exit)\n")
try:
while True:
try:
health = _get('/healthz')
peers = health.get('peers', {})
active = health.get('active_routes', 0)
status = health.get('status', '?')
# Clear to start of previous output using ANSI codes
cols, _ = shutil.get_terminal_size(fallback=(80, 24))
peer_count = len(peers)
peers_up = sum(1 for p in peers.values() if p.get('state') == 'up')
lines = [
f" API: {status.upper():<8} Routes: {active:<6} Peers: {peers_up}/{peer_count} UP",
'',
]
for ip, info in peers.items():
state = info.get('state', 'unknown').upper()
updated = info.get('updated', '-')
lines.append(f" {ip:<22} {state:<6} {updated}")
lines.append('')
lines.append(f" Refreshing every 5s ... {time.strftime('%H:%M:%S')}")
output = '\n'.join(lines)
# Move cursor up to overwrite previous output
print(f"\033[{len(lines) + 1}A", end='')
print(output)
except requests.exceptions.ConnectionError:
print("\033[1A API: UNREACHABLE")
time.sleep(5)
except KeyboardInterrupt:
print("\n\nMonitor stopped.")
def cmd_routes(args):
data = _get('/routes')
print(f"Active routes: {data['count']}")
for r in data['routes']:
path = ' '.join(str(a) for a in r.get('as_path', []))
comms = ' '.join(r.get('communities', []))
parts = [r['prefix'], f'nh={r["next_hop"]}']
if path:
parts.append(f'as-path=[{path}]')
if comms:
parts.append(f'community=[{comms}]')
if r.get('med') is not None:
parts.append(f'med={r["med"]}')
print(' ' + ' '.join(parts))
def cmd_scenarios(args):
data = _get('/scenarios')
print(f"{'Name':<20} {'Routes':>6} Description")
print('-' * 70)
for name, info in data['scenarios'].items():
print(f"{name:<20} {info['route_count']:>6} {info['description']}")
def cmd_announce(args):
payload = {
'prefixes': args.prefixes,
'next_hop': args.next_hop,
'as_path': [int(a) for a in args.as_path] if args.as_path else [],
'communities': args.community or [],
}
if args.med is not None:
payload['med'] = args.med
data = _post('/announce', payload)
print(f"Announced {data['count']} route(s):")
for p in data['announced']:
print(f" + {p}")
def cmd_withdraw(args):
data = _post('/withdraw', {'prefixes': args.prefixes})
print(f"Withdrew {data['count']} route(s):")
for p in data['withdrawn']:
print(f" - {p}")
def cmd_withdraw_all(args):
data = _post('/withdraw/all')
print(f"Withdrew all {data['count']} route(s)")
def cmd_scenario(args):
data = _post(f'/scenario/{args.name}')
print(f"Loaded scenario '{args.name}': {data['count']} routes announced")
def cmd_withdraw_scenario(args):
data = _delete(f'/scenario/{args.name}')
print(f"Withdrew scenario '{args.name}': {data['count']} routes withdrawn")
def cmd_full_table(args):
"""Inject a full IPv4 routing table for stress testing."""
count = args.count
print(f"Starting full-table injection: {count} prefixes")
print("This generates routes in background. Use 'inject.py full-table-status' to track.\n")
data = _post('/full-table/start', {'count': count, 'batch_size': args.batch_size}, timeout=120)
print(data.get('message', 'Started'))
if args.follow:
print()
_follow_injection()
def cmd_full_table_status(args):
"""Show full-table injection progress."""
data = _get('/full-table/status')
active = data.get('active', False)
total = data.get('total', 0)
injected = data.get('injected', 0)
pct = data.get('progress_pct', 0)
rate = data.get('rate_pps', 0)
elapsed = data.get('elapsed_sec', 0)
error = data.get('error')
active_routes = data.get('active_routes', 0)
if error:
print(f"ERROR: {error}")
elif active:
bar_len = 40
filled = int(bar_len * pct / 100)
bar = '#' * filled + '-' * (bar_len - filled)
print(f"[{bar}] {pct:.1f}%")
print(f" Injected: {injected:,} / {total:,} ({rate:.0f} routes/s)")
print(f" Elapsed: {elapsed:.0f}s")
print(f" Active routes in ExaBGP: {active_routes:,}")
elif total > 0:
print(f"Injection complete: {injected:,} / {total:,} routes in {elapsed:.0f}s ({rate:.0f}/s)")
print(f"Active routes in ExaBGP: {active_routes:,}")
else:
print("No injection running or completed.")
print(f"Active routes: {active_routes:,}")
def cmd_full_table_stop(args):
"""Stop an in-progress full-table injection."""
try:
data = _post('/full-table/stop')
print(f"Stop requested. Injected so far: {data.get('injected_so_far', '?'):,}")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 400:
print("No injection in progress.")
else:
raise
def _follow_injection():
"""Poll injection status until complete."""
import shutil
lines_printed = 0
try:
while True:
data = _get('/full-table/status')
active = data.get('active', False)
total = data.get('total', 0)
injected = data.get('injected', 0)
pct = data.get('progress_pct', 0)
rate = data.get('rate_pps', 0)
elapsed = data.get('elapsed_sec', 0)
active_routes = data.get('active_routes', 0)
# Move cursor up to overwrite
if lines_printed > 0:
print(f"\033[{lines_printed}A", end='')
bar_len = 40
filled = int(bar_len * pct / 100)
bar = '#' * filled + '-' * (bar_len - filled)
output_lines = [
f" [{bar}] {pct:.1f}%",
f" Injected: {injected:,} / {total:,} ({rate:.0f} routes/s) elapsed: {elapsed:.0f}s",
f" Active routes: {active_routes:,}",
]
print('\n'.join(output_lines))
lines_printed = len(output_lines)
if not active:
print(f"\nDone! {injected:,} routes injected in {elapsed:.0f}s")
break
time.sleep(2)
except KeyboardInterrupt:
print("\n\nFollowing stopped (injection continues in background).")
def cmd_churn(args):
"""
Cycle announce/withdraw on the 'churn' scenario to generate ip_rib_log
entries and populate stats_chg_* tables in OpenBMP.
"""
count = args.count
interval = args.interval
print(f"Starting churn: {count} cycles, {interval}s interval")
print("This will populate ip_rib_log and stats_chg_* tables in OpenBMP.")
print("Press Ctrl+C to stop.\n")
cycle = 0
try:
while count == 0 or cycle < count:
cycle += 1
print(f"Cycle {cycle}: announcing churn scenario...")
r = _post('/scenario/churn')
print(f" + {r['count']} routes announced")
time.sleep(interval)
print(f"Cycle {cycle}: withdrawing churn scenario...")
r = _delete('/scenario/churn')
print(f" - {r['count']} routes withdrawn")
time.sleep(interval)
print()
except KeyboardInterrupt:
print("\nChurn stopped. Withdrawing any active routes...")
_post('/withdraw/all')
print("Done.")
def main():
parser = argparse.ArgumentParser(
description='ExaBGP Route Injection CLI',
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = parser.add_subparsers(dest='command')
sub.add_parser('status', help='Show API health and peer states (JSON)')
sub.add_parser('peers', help='Show BGP peer states in a readable table')
sub.add_parser('routes', help='List active announced routes')
sub.add_parser('scenarios', help='List available scenarios')
sub.add_parser('monitor', help='Live-refreshing terminal status view')
sub.add_parser('withdraw-all', help='Withdraw all active routes')
p = sub.add_parser('announce', help='Announce one or more prefixes')
p.add_argument('prefixes', nargs='+')
p.add_argument('--as-path', nargs='+', default=[], metavar='ASN')
p.add_argument('--community', nargs='+', default=[], metavar='COMM')
p.add_argument('--med', type=int, default=None)
p.add_argument('--next-hop', default='self', metavar='IP')
p = sub.add_parser('withdraw', help='Withdraw one or more prefixes')
p.add_argument('prefixes', nargs='+')
p = sub.add_parser('scenario', help='Load a named scenario')
p.add_argument('name')
p = sub.add_parser('withdraw-scenario', help='Withdraw a named scenario')
p.add_argument('name')
p = sub.add_parser('full-table', help='Inject full IPv4 routing table (background)')
p.add_argument('--count', type=int, default=900000, metavar='N',
help='Number of prefixes to inject (default: 900000)')
p.add_argument('--batch-size', type=int, default=1000, metavar='N',
help='Progress update interval (default: 1000)')
p.add_argument('--follow', '-f', action='store_true',
help='Follow progress until complete')
sub.add_parser('full-table-status', help='Show full-table injection progress')
sub.add_parser('full-table-stop', help='Stop full-table injection')
p = sub.add_parser('churn', help='Cycle announce/withdraw to populate ip_rib_log')
p.add_argument('--count', type=int, default=0, metavar='N',
help='Number of cycles (0 = infinite)')
p.add_argument('--interval', type=float, default=30.0, metavar='SEC',
help='Seconds between announce and withdraw (default: 30)')
args = parser.parse_args()
cmds = {
'status': cmd_status,
'peers': cmd_peers,
'routes': cmd_routes,
'scenarios': cmd_scenarios,
'monitor': cmd_monitor,
'announce': cmd_announce,
'withdraw': cmd_withdraw,
'withdraw-all': cmd_withdraw_all,
'scenario': cmd_scenario,
'withdraw-scenario': cmd_withdraw_scenario,
'full-table': cmd_full_table,
'full-table-status': cmd_full_table_status,
'full-table-stop': cmd_full_table_stop,
'churn': cmd_churn,
}
if not args.command:
parser.print_help()
sys.exit(1)
try:
cmds[args.command](args)
except requests.exceptions.ConnectionError:
print(f"ERROR: Cannot connect to ExaBGP API at {API}")
print("Is the exabgp container running? docker compose logs exabgp")
sys.exit(1)
except requests.exceptions.HTTPError as e:
print(f"ERROR: {e}")
sys.exit(1)
if __name__ == '__main__':
main()