#!/usr/bin/env python3 """ ExaBGP Route Injection API Server Runs as an ExaBGP process — reads BGP events from stdin, writes BGP commands to stdout. Flask API runs in a background thread so HTTP requests trigger live route announcements/withdrawals. API endpoints: GET /healthz - health + active route count GET /routes - list all active routes POST /announce - announce prefixes POST /withdraw - withdraw prefixes POST /withdraw/all - withdraw everything GET /scenarios - list available scenarios POST /scenario/ - load a scenario DELETE /scenario/ - withdraw a scenario GET /peers - BGP peer state """ import sys import os import json import threading import time import logging from flask import Flask, request, jsonify # --- logging to stderr so it doesn't pollute ExaBGP's stdout pipe --- logging.basicConfig( stream=sys.stderr, level=logging.INFO, format='%(asctime)s [API] %(levelname)s %(message)s', ) log = logging.getLogger(__name__) app = Flask(__name__) # Thread-safe stdout for ExaBGP commands _stdout_lock = threading.Lock() # Active routes: prefix -> metadata dict active_routes = {} # Peer state received from ExaBGP events peer_states = {} # --------------------------------------------------------------------------- # ExaBGP command helpers # --------------------------------------------------------------------------- _quiet_mode = False def _send(cmd: str): """Write a command to ExaBGP via stdout.""" with _stdout_lock: sys.stdout.write(cmd + '\n') sys.stdout.flush() if not _quiet_mode: log.info('→ ExaBGP: %s', cmd) def _build_announce(prefix, next_hop='self', as_path=None, communities=None, med=None, local_pref=None): parts = [f'announce route {prefix} next-hop {next_hop}'] if as_path: parts.append(f'as-path [ {" ".join(str(a) for a in as_path)} ]') if communities: parts.append(f'community [ {" ".join(communities)} ]') if med is not None: parts.append(f'med {med}') if local_pref is not None: parts.append(f'local-preference {local_pref}') return ' '.join(parts) def _build_withdraw(prefix, next_hop='self'): return f'withdraw route {prefix} next-hop {next_hop}' def announce_route(prefix, next_hop='self', as_path=None, communities=None, med=None, local_pref=None): cmd = _build_announce(prefix, next_hop, as_path, communities, med, local_pref) _send(cmd) active_routes[prefix] = { 'prefix': prefix, 'next_hop': next_hop, 'as_path': as_path or [], 'communities': communities or [], 'med': med, 'local_pref': local_pref, 'announced_at': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), } def withdraw_route(prefix, next_hop='self'): _send(_build_withdraw(prefix, next_hop)) active_routes.pop(prefix, None) # --------------------------------------------------------------------------- # Flask API routes # --------------------------------------------------------------------------- @app.route('/healthz') def health(): return jsonify({ 'status': 'ok', 'active_routes': len(active_routes), 'peers': peer_states, }) @app.route('/routes', methods=['GET']) def list_routes(): return jsonify({ 'count': len(active_routes), 'routes': list(active_routes.values()), }) @app.route('/announce', methods=['POST']) def api_announce(): data = request.get_json(force=True) prefixes = data.get('prefixes', []) if not prefixes: return jsonify({'error': 'prefixes list required'}), 400 next_hop = data.get('next_hop', 'self') as_path = data.get('as_path', []) communities = data.get('communities', []) med = data.get('med') local_pref = data.get('local_pref') announced = [] for prefix in prefixes: announce_route(prefix, next_hop, as_path, communities, med, local_pref) announced.append(prefix) return jsonify({'announced': announced, 'count': len(announced)}) @app.route('/withdraw', methods=['POST']) def api_withdraw(): data = request.get_json(force=True) prefixes = data.get('prefixes', []) if not prefixes: return jsonify({'error': 'prefixes list required'}), 400 withdrawn = [] for prefix in prefixes: withdraw_route(prefix) withdrawn.append(prefix) return jsonify({'withdrawn': withdrawn, 'count': len(withdrawn)}) @app.route('/withdraw/all', methods=['POST']) def api_withdraw_all(): prefixes = list(active_routes.keys()) for prefix in prefixes: withdraw_route(prefix) return jsonify({'withdrawn': prefixes, 'count': len(prefixes)}) # --------------------------------------------------------------------------- # Scenario management # --------------------------------------------------------------------------- sys.path.insert(0, '/exabgp') from scenarios import SCENARIOS, generate_full_internet # --------------------------------------------------------------------------- # Full-table background injection # --------------------------------------------------------------------------- _injection_state = { 'active': False, 'total': 0, 'injected': 0, 'elapsed_sec': 0, 'rate_pps': 0, 'error': None, 'stop_requested': False, } _injection_lock = threading.Lock() @app.route('/scenarios', methods=['GET']) def list_scenarios(): return jsonify({ 'scenarios': { name: { 'description': s.get('description', ''), 'route_count': len(s.get('routes', [])), } for name, s in SCENARIOS.items() } }) @app.route('/scenario/', methods=['POST']) def load_scenario(name): if name not in SCENARIOS: return jsonify({'error': f'Unknown scenario: {name}. Available: {list(SCENARIOS)}'}), 404 scenario = SCENARIOS[name] announced = [] for route in scenario['routes']: prefix = route['prefix'] announce_route( prefix, next_hop=route.get('next_hop', 'self'), as_path=route.get('as_path', []), communities=route.get('communities', []), med=route.get('med'), local_pref=route.get('local_pref'), ) announced.append(prefix) log.info('Loaded scenario %s: %d routes', name, len(announced)) return jsonify({'scenario': name, 'announced': announced, 'count': len(announced)}) @app.route('/scenario/', methods=['DELETE']) def unload_scenario(name): if name not in SCENARIOS: return jsonify({'error': f'Unknown scenario: {name}'}), 404 scenario = SCENARIOS[name] withdrawn = [] for route in scenario['routes']: prefix = route['prefix'] if prefix in active_routes: withdraw_route(prefix) withdrawn.append(prefix) log.info('Unloaded scenario %s: %d routes', name, len(withdrawn)) return jsonify({'scenario': name, 'withdrawn': withdrawn, 'count': len(withdrawn)}) @app.route('/peers', methods=['GET']) def get_peers(): return jsonify({'peers': peer_states}) # --------------------------------------------------------------------------- # Full-table injection endpoints # --------------------------------------------------------------------------- def _injection_worker(count, batch_size): """Background thread: generate and inject full internet table.""" global _quiet_mode try: _quiet_mode = True # suppress per-route logging log.info('Generating %d full-table prefixes...', count) routes = generate_full_internet(count) with _injection_lock: _injection_state['total'] = len(routes) log.info('Generated %d routes, starting injection at batch_size=%d', len(routes), batch_size) start_time = time.time() for i, route in enumerate(routes): with _injection_lock: if _injection_state['stop_requested']: log.info('Injection stopped by user at %d/%d', i, len(routes)) break prefix = route['prefix'] announce_route( prefix, next_hop=route.get('next_hop', 'self'), as_path=route.get('as_path', []), communities=route.get('communities', []), med=route.get('med'), local_pref=route.get('local_pref'), ) # Update progress periodically (every batch_size routes) if (i + 1) % batch_size == 0: elapsed = time.time() - start_time with _injection_lock: _injection_state['injected'] = i + 1 _injection_state['elapsed_sec'] = round(elapsed, 1) _injection_state['rate_pps'] = round((i + 1) / elapsed, 1) if elapsed > 0 else 0 log.info('Injection progress: %d/%d (%.0f/s)', i + 1, len(routes), (i + 1) / elapsed if elapsed > 0 else 0) elapsed = time.time() - start_time with _injection_lock: _injection_state['injected'] = min(i + 1, len(routes)) _injection_state['elapsed_sec'] = round(elapsed, 1) _injection_state['rate_pps'] = round(_injection_state['injected'] / elapsed, 1) if elapsed > 0 else 0 _injection_state['active'] = False log.info('Injection complete: %d routes in %.1fs (%.0f/s)', _injection_state['injected'], elapsed, _injection_state['injected'] / elapsed if elapsed > 0 else 0) except Exception as e: log.error('Injection error: %s', e) with _injection_lock: _injection_state['error'] = str(e) _injection_state['active'] = False finally: _quiet_mode = False @app.route('/full-table/start', methods=['POST']) def start_full_table(): """Start background injection of a full IPv4 routing table. POST body (all optional): count: Number of prefixes (default 900000) batch_size: Progress update interval (default 1000) """ with _injection_lock: if _injection_state['active']: return jsonify({ 'error': 'Injection already in progress', 'state': dict(_injection_state), }), 409 data = request.get_json(force=True) if request.data else {} count = int(data.get('count', 900000)) batch_size = int(data.get('batch_size', 1000)) with _injection_lock: _injection_state.update({ 'active': True, 'total': count, 'injected': 0, 'elapsed_sec': 0, 'rate_pps': 0, 'error': None, 'stop_requested': False, }) t = threading.Thread(target=_injection_worker, args=(count, batch_size), daemon=True) t.start() log.info('Started full-table injection: %d prefixes', count) return jsonify({ 'status': 'started', 'count': count, 'message': f'Generating and injecting {count} prefixes in background. GET /full-table/status to track progress.', }) @app.route('/full-table/status', methods=['GET']) def full_table_status(): """Get current full-table injection progress.""" with _injection_lock: state = dict(_injection_state) if state['total'] > 0: state['progress_pct'] = round(state['injected'] / state['total'] * 100, 1) else: state['progress_pct'] = 0 state['active_routes'] = len(active_routes) return jsonify(state) @app.route('/full-table/stop', methods=['POST']) def stop_full_table(): """Stop an in-progress full-table injection.""" with _injection_lock: if not _injection_state['active']: return jsonify({'error': 'No injection in progress'}), 400 _injection_state['stop_requested'] = True return jsonify({'status': 'stop_requested', 'injected_so_far': _injection_state['injected']}) # --------------------------------------------------------------------------- # ExaBGP event loop (main thread) # --------------------------------------------------------------------------- def parse_exabgp_event(line: str): """Parse ExaBGP neighbor state events from stdin.""" # ExaBGP text events look like: # neighbor 10.100.0.100 up # neighbor 10.100.0.100 down parts = line.strip().split() if len(parts) >= 3 and parts[0] == 'neighbor': peer_ip = parts[1] state = parts[2] peer_states[peer_ip] = {'state': state, 'updated': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())} log.info('Peer %s → %s', peer_ip, state) def main(): api_port = int(os.environ.get('EXABGP_API_PORT', 5050)) # Start Flask in background thread flask_thread = threading.Thread( target=lambda: app.run(host='0.0.0.0', port=api_port, threaded=True), daemon=True, ) flask_thread.start() log.info('Flask API listening on port %d', api_port) # Main thread: read ExaBGP events from stdin log.info('Waiting for ExaBGP events on stdin...') while True: try: line = sys.stdin.readline() if not line: log.warning('ExaBGP stdin closed, exiting') break line = line.strip() if line: log.debug('← ExaBGP: %s', line) parse_exabgp_event(line) except KeyboardInterrupt: break except Exception as e: log.error('Event loop error: %s', e) if __name__ == '__main__': main()