#!/usr/bin/env python3 """ inject.py — CLI wrapper for the ExaBGP Route Injection API Usage: inject.py status inject.py peers inject.py routes inject.py scenarios inject.py announce [...] [--as-path ASN...] [--community STR...] [--med N] [--next-hop IP] inject.py withdraw [...] inject.py withdraw-all inject.py scenario inject.py withdraw-scenario inject.py full-table [--count N] [--follow] # inject full IPv4 table (background) inject.py full-table-status # show injection progress inject.py full-table-stop # stop injection inject.py churn [--count N] [--interval SEC] # cycle announce/withdraw for ip_rib_log population inject.py monitor # live-refresh terminal view Environment: EXABGP_API=http://localhost:5050 API base URL """ import sys import os import json import time import argparse import requests API = os.environ.get('EXABGP_API', 'http://localhost:5050') def _post(path, data=None, timeout=10): r = requests.post(f'{API}{path}', json=data or {}, timeout=timeout) r.raise_for_status() return r.json() def _delete(path): r = requests.delete(f'{API}{path}', timeout=10) r.raise_for_status() return r.json() def _get(path): r = requests.get(f'{API}{path}', timeout=10) r.raise_for_status() return r.json() def _pp(data): print(json.dumps(data, indent=2)) def cmd_status(args): _pp(_get('/healthz')) def cmd_peers(args): data = _get('/peers') peers = data.get('peers', {}) if not peers: print("No peer state received yet (ExaBGP may still be establishing sessions).") return print(f"{'Peer':<20} {'State':<8} {'Updated'}") print('-' * 55) for ip, info in peers.items(): state = info.get('state', 'unknown') updated = info.get('updated', '-') indicator = 'UP' if state == 'up' else 'DOWN' print(f"{ip:<20} {indicator:<8} {updated}") def cmd_monitor(args): """Live-refreshing terminal status view. Ctrl+C to exit.""" import shutil print("OpenBMP ExaBGP Monitor (Ctrl+C to exit)\n") try: while True: try: health = _get('/healthz') peers = health.get('peers', {}) active = health.get('active_routes', 0) status = health.get('status', '?') # Clear to start of previous output using ANSI codes cols, _ = shutil.get_terminal_size(fallback=(80, 24)) peer_count = len(peers) peers_up = sum(1 for p in peers.values() if p.get('state') == 'up') lines = [ f" API: {status.upper():<8} Routes: {active:<6} Peers: {peers_up}/{peer_count} UP", '', ] for ip, info in peers.items(): state = info.get('state', 'unknown').upper() updated = info.get('updated', '-') lines.append(f" {ip:<22} {state:<6} {updated}") lines.append('') lines.append(f" Refreshing every 5s ... {time.strftime('%H:%M:%S')}") output = '\n'.join(lines) # Move cursor up to overwrite previous output print(f"\033[{len(lines) + 1}A", end='') print(output) except requests.exceptions.ConnectionError: print("\033[1A API: UNREACHABLE") time.sleep(5) except KeyboardInterrupt: print("\n\nMonitor stopped.") def cmd_routes(args): data = _get('/routes') print(f"Active routes: {data['count']}") for r in data['routes']: path = ' '.join(str(a) for a in r.get('as_path', [])) comms = ' '.join(r.get('communities', [])) parts = [r['prefix'], f'nh={r["next_hop"]}'] if path: parts.append(f'as-path=[{path}]') if comms: parts.append(f'community=[{comms}]') if r.get('med') is not None: parts.append(f'med={r["med"]}') print(' ' + ' '.join(parts)) def cmd_scenarios(args): data = _get('/scenarios') print(f"{'Name':<20} {'Routes':>6} Description") print('-' * 70) for name, info in data['scenarios'].items(): print(f"{name:<20} {info['route_count']:>6} {info['description']}") def cmd_announce(args): payload = { 'prefixes': args.prefixes, 'next_hop': args.next_hop, 'as_path': [int(a) for a in args.as_path] if args.as_path else [], 'communities': args.community or [], } if args.med is not None: payload['med'] = args.med data = _post('/announce', payload) print(f"Announced {data['count']} route(s):") for p in data['announced']: print(f" + {p}") def cmd_withdraw(args): data = _post('/withdraw', {'prefixes': args.prefixes}) print(f"Withdrew {data['count']} route(s):") for p in data['withdrawn']: print(f" - {p}") def cmd_withdraw_all(args): data = _post('/withdraw/all') print(f"Withdrew all {data['count']} route(s)") def cmd_scenario(args): data = _post(f'/scenario/{args.name}') print(f"Loaded scenario '{args.name}': {data['count']} routes announced") def cmd_withdraw_scenario(args): data = _delete(f'/scenario/{args.name}') print(f"Withdrew scenario '{args.name}': {data['count']} routes withdrawn") def cmd_full_table(args): """Inject a full IPv4 routing table for stress testing.""" count = args.count print(f"Starting full-table injection: {count} prefixes") print("This generates routes in background. Use 'inject.py full-table-status' to track.\n") data = _post('/full-table/start', {'count': count, 'batch_size': args.batch_size}, timeout=120) print(data.get('message', 'Started')) if args.follow: print() _follow_injection() def cmd_full_table_status(args): """Show full-table injection progress.""" data = _get('/full-table/status') active = data.get('active', False) total = data.get('total', 0) injected = data.get('injected', 0) pct = data.get('progress_pct', 0) rate = data.get('rate_pps', 0) elapsed = data.get('elapsed_sec', 0) error = data.get('error') active_routes = data.get('active_routes', 0) if error: print(f"ERROR: {error}") elif active: bar_len = 40 filled = int(bar_len * pct / 100) bar = '#' * filled + '-' * (bar_len - filled) print(f"[{bar}] {pct:.1f}%") print(f" Injected: {injected:,} / {total:,} ({rate:.0f} routes/s)") print(f" Elapsed: {elapsed:.0f}s") print(f" Active routes in ExaBGP: {active_routes:,}") elif total > 0: print(f"Injection complete: {injected:,} / {total:,} routes in {elapsed:.0f}s ({rate:.0f}/s)") print(f"Active routes in ExaBGP: {active_routes:,}") else: print("No injection running or completed.") print(f"Active routes: {active_routes:,}") def cmd_full_table_stop(args): """Stop an in-progress full-table injection.""" try: data = _post('/full-table/stop') print(f"Stop requested. Injected so far: {data.get('injected_so_far', '?'):,}") except requests.exceptions.HTTPError as e: if e.response.status_code == 400: print("No injection in progress.") else: raise def _follow_injection(): """Poll injection status until complete.""" import shutil lines_printed = 0 try: while True: data = _get('/full-table/status') active = data.get('active', False) total = data.get('total', 0) injected = data.get('injected', 0) pct = data.get('progress_pct', 0) rate = data.get('rate_pps', 0) elapsed = data.get('elapsed_sec', 0) active_routes = data.get('active_routes', 0) # Move cursor up to overwrite if lines_printed > 0: print(f"\033[{lines_printed}A", end='') bar_len = 40 filled = int(bar_len * pct / 100) bar = '#' * filled + '-' * (bar_len - filled) output_lines = [ f" [{bar}] {pct:.1f}%", f" Injected: {injected:,} / {total:,} ({rate:.0f} routes/s) elapsed: {elapsed:.0f}s", f" Active routes: {active_routes:,}", ] print('\n'.join(output_lines)) lines_printed = len(output_lines) if not active: print(f"\nDone! {injected:,} routes injected in {elapsed:.0f}s") break time.sleep(2) except KeyboardInterrupt: print("\n\nFollowing stopped (injection continues in background).") def cmd_churn(args): """ Cycle announce/withdraw on the 'churn' scenario to generate ip_rib_log entries and populate stats_chg_* tables in OpenBMP. """ count = args.count interval = args.interval print(f"Starting churn: {count} cycles, {interval}s interval") print("This will populate ip_rib_log and stats_chg_* tables in OpenBMP.") print("Press Ctrl+C to stop.\n") cycle = 0 try: while count == 0 or cycle < count: cycle += 1 print(f"Cycle {cycle}: announcing churn scenario...") r = _post('/scenario/churn') print(f" + {r['count']} routes announced") time.sleep(interval) print(f"Cycle {cycle}: withdrawing churn scenario...") r = _delete('/scenario/churn') print(f" - {r['count']} routes withdrawn") time.sleep(interval) print() except KeyboardInterrupt: print("\nChurn stopped. Withdrawing any active routes...") _post('/withdraw/all') print("Done.") def main(): parser = argparse.ArgumentParser( description='ExaBGP Route Injection CLI', formatter_class=argparse.RawDescriptionHelpFormatter, ) sub = parser.add_subparsers(dest='command') sub.add_parser('status', help='Show API health and peer states (JSON)') sub.add_parser('peers', help='Show BGP peer states in a readable table') sub.add_parser('routes', help='List active announced routes') sub.add_parser('scenarios', help='List available scenarios') sub.add_parser('monitor', help='Live-refreshing terminal status view') sub.add_parser('withdraw-all', help='Withdraw all active routes') p = sub.add_parser('announce', help='Announce one or more prefixes') p.add_argument('prefixes', nargs='+') p.add_argument('--as-path', nargs='+', default=[], metavar='ASN') p.add_argument('--community', nargs='+', default=[], metavar='COMM') p.add_argument('--med', type=int, default=None) p.add_argument('--next-hop', default='self', metavar='IP') p = sub.add_parser('withdraw', help='Withdraw one or more prefixes') p.add_argument('prefixes', nargs='+') p = sub.add_parser('scenario', help='Load a named scenario') p.add_argument('name') p = sub.add_parser('withdraw-scenario', help='Withdraw a named scenario') p.add_argument('name') p = sub.add_parser('full-table', help='Inject full IPv4 routing table (background)') p.add_argument('--count', type=int, default=900000, metavar='N', help='Number of prefixes to inject (default: 900000)') p.add_argument('--batch-size', type=int, default=1000, metavar='N', help='Progress update interval (default: 1000)') p.add_argument('--follow', '-f', action='store_true', help='Follow progress until complete') sub.add_parser('full-table-status', help='Show full-table injection progress') sub.add_parser('full-table-stop', help='Stop full-table injection') p = sub.add_parser('churn', help='Cycle announce/withdraw to populate ip_rib_log') p.add_argument('--count', type=int, default=0, metavar='N', help='Number of cycles (0 = infinite)') p.add_argument('--interval', type=float, default=30.0, metavar='SEC', help='Seconds between announce and withdraw (default: 30)') args = parser.parse_args() cmds = { 'status': cmd_status, 'peers': cmd_peers, 'routes': cmd_routes, 'scenarios': cmd_scenarios, 'monitor': cmd_monitor, 'announce': cmd_announce, 'withdraw': cmd_withdraw, 'withdraw-all': cmd_withdraw_all, 'scenario': cmd_scenario, 'withdraw-scenario': cmd_withdraw_scenario, 'full-table': cmd_full_table, 'full-table-status': cmd_full_table_status, 'full-table-stop': cmd_full_table_stop, 'churn': cmd_churn, } if not args.command: parser.print_help() sys.exit(1) try: cmds[args.command](args) except requests.exceptions.ConnectionError: print(f"ERROR: Cannot connect to ExaBGP API at {API}") print("Is the exabgp container running? docker compose logs exabgp") sys.exit(1) except requests.exceptions.HTTPError as e: print(f"ERROR: {e}") sys.exit(1) if __name__ == '__main__': main()