418 lines
14 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
ExaBGP Route Injection API Server
Runs as an ExaBGP process reads BGP events from stdin, writes BGP
commands to stdout. Flask API runs in a background thread so HTTP
requests trigger live route announcements/withdrawals.
API endpoints:
GET /healthz - health + active route count
GET /routes - list all active routes
POST /announce - announce prefixes
POST /withdraw - withdraw prefixes
POST /withdraw/all - withdraw everything
GET /scenarios - list available scenarios
POST /scenario/<name> - load a scenario
DELETE /scenario/<name> - withdraw a scenario
GET /peers - BGP peer state
"""
import sys
import os
import json
import threading
import time
import logging
from flask import Flask, request, jsonify
# --- logging to stderr so it doesn't pollute ExaBGP's stdout pipe ---
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format='%(asctime)s [API] %(levelname)s %(message)s',
)
log = logging.getLogger(__name__)
app = Flask(__name__)
# Thread-safe stdout for ExaBGP commands
_stdout_lock = threading.Lock()
# Active routes: prefix -> metadata dict
active_routes = {}
# Peer state received from ExaBGP events
peer_states = {}
# ---------------------------------------------------------------------------
# ExaBGP command helpers
# ---------------------------------------------------------------------------
_quiet_mode = False
def _send(cmd: str):
"""Write a command to ExaBGP via stdout."""
with _stdout_lock:
sys.stdout.write(cmd + '\n')
sys.stdout.flush()
if not _quiet_mode:
log.info('→ ExaBGP: %s', cmd)
def _build_announce(prefix, next_hop='self', as_path=None, communities=None, med=None, local_pref=None):
parts = [f'announce route {prefix} next-hop {next_hop}']
if as_path:
parts.append(f'as-path [ {" ".join(str(a) for a in as_path)} ]')
if communities:
parts.append(f'community [ {" ".join(communities)} ]')
if med is not None:
parts.append(f'med {med}')
if local_pref is not None:
parts.append(f'local-preference {local_pref}')
return ' '.join(parts)
def _build_withdraw(prefix, next_hop='self'):
return f'withdraw route {prefix} next-hop {next_hop}'
def announce_route(prefix, next_hop='self', as_path=None, communities=None, med=None, local_pref=None):
cmd = _build_announce(prefix, next_hop, as_path, communities, med, local_pref)
_send(cmd)
active_routes[prefix] = {
'prefix': prefix,
'next_hop': next_hop,
'as_path': as_path or [],
'communities': communities or [],
'med': med,
'local_pref': local_pref,
'announced_at': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
}
def withdraw_route(prefix, next_hop='self'):
_send(_build_withdraw(prefix, next_hop))
active_routes.pop(prefix, None)
# ---------------------------------------------------------------------------
# Flask API routes
# ---------------------------------------------------------------------------
@app.route('/healthz')
def health():
return jsonify({
'status': 'ok',
'active_routes': len(active_routes),
'peers': peer_states,
})
@app.route('/routes', methods=['GET'])
def list_routes():
return jsonify({
'count': len(active_routes),
'routes': list(active_routes.values()),
})
@app.route('/announce', methods=['POST'])
def api_announce():
data = request.get_json(force=True)
prefixes = data.get('prefixes', [])
if not prefixes:
return jsonify({'error': 'prefixes list required'}), 400
next_hop = data.get('next_hop', 'self')
as_path = data.get('as_path', [])
communities = data.get('communities', [])
med = data.get('med')
local_pref = data.get('local_pref')
announced = []
for prefix in prefixes:
announce_route(prefix, next_hop, as_path, communities, med, local_pref)
announced.append(prefix)
return jsonify({'announced': announced, 'count': len(announced)})
@app.route('/withdraw', methods=['POST'])
def api_withdraw():
data = request.get_json(force=True)
prefixes = data.get('prefixes', [])
if not prefixes:
return jsonify({'error': 'prefixes list required'}), 400
withdrawn = []
for prefix in prefixes:
withdraw_route(prefix)
withdrawn.append(prefix)
return jsonify({'withdrawn': withdrawn, 'count': len(withdrawn)})
@app.route('/withdraw/all', methods=['POST'])
def api_withdraw_all():
prefixes = list(active_routes.keys())
for prefix in prefixes:
withdraw_route(prefix)
return jsonify({'withdrawn': prefixes, 'count': len(prefixes)})
# ---------------------------------------------------------------------------
# Scenario management
# ---------------------------------------------------------------------------
sys.path.insert(0, '/exabgp')
from scenarios import SCENARIOS, generate_full_internet
# ---------------------------------------------------------------------------
# Full-table background injection
# ---------------------------------------------------------------------------
_injection_state = {
'active': False,
'total': 0,
'injected': 0,
'elapsed_sec': 0,
'rate_pps': 0,
'error': None,
'stop_requested': False,
}
_injection_lock = threading.Lock()
@app.route('/scenarios', methods=['GET'])
def list_scenarios():
return jsonify({
'scenarios': {
name: {
'description': s.get('description', ''),
'route_count': len(s.get('routes', [])),
}
for name, s in SCENARIOS.items()
}
})
@app.route('/scenario/<name>', methods=['POST'])
def load_scenario(name):
if name not in SCENARIOS:
return jsonify({'error': f'Unknown scenario: {name}. Available: {list(SCENARIOS)}'}), 404
scenario = SCENARIOS[name]
announced = []
for route in scenario['routes']:
prefix = route['prefix']
announce_route(
prefix,
next_hop=route.get('next_hop', 'self'),
as_path=route.get('as_path', []),
communities=route.get('communities', []),
med=route.get('med'),
local_pref=route.get('local_pref'),
)
announced.append(prefix)
log.info('Loaded scenario %s: %d routes', name, len(announced))
return jsonify({'scenario': name, 'announced': announced, 'count': len(announced)})
@app.route('/scenario/<name>', methods=['DELETE'])
def unload_scenario(name):
if name not in SCENARIOS:
return jsonify({'error': f'Unknown scenario: {name}'}), 404
scenario = SCENARIOS[name]
withdrawn = []
for route in scenario['routes']:
prefix = route['prefix']
if prefix in active_routes:
withdraw_route(prefix)
withdrawn.append(prefix)
log.info('Unloaded scenario %s: %d routes', name, len(withdrawn))
return jsonify({'scenario': name, 'withdrawn': withdrawn, 'count': len(withdrawn)})
@app.route('/peers', methods=['GET'])
def get_peers():
return jsonify({'peers': peer_states})
# ---------------------------------------------------------------------------
# Full-table injection endpoints
# ---------------------------------------------------------------------------
def _injection_worker(count, batch_size):
"""Background thread: generate and inject full internet table."""
global _quiet_mode
try:
_quiet_mode = True # suppress per-route logging
log.info('Generating %d full-table prefixes...', count)
routes = generate_full_internet(count)
with _injection_lock:
_injection_state['total'] = len(routes)
log.info('Generated %d routes, starting injection at batch_size=%d', len(routes), batch_size)
start_time = time.time()
for i, route in enumerate(routes):
with _injection_lock:
if _injection_state['stop_requested']:
log.info('Injection stopped by user at %d/%d', i, len(routes))
break
prefix = route['prefix']
announce_route(
prefix,
next_hop=route.get('next_hop', 'self'),
as_path=route.get('as_path', []),
communities=route.get('communities', []),
med=route.get('med'),
local_pref=route.get('local_pref'),
)
# Update progress periodically (every batch_size routes)
if (i + 1) % batch_size == 0:
elapsed = time.time() - start_time
with _injection_lock:
_injection_state['injected'] = i + 1
_injection_state['elapsed_sec'] = round(elapsed, 1)
_injection_state['rate_pps'] = round((i + 1) / elapsed, 1) if elapsed > 0 else 0
log.info('Injection progress: %d/%d (%.0f/s)',
i + 1, len(routes), (i + 1) / elapsed if elapsed > 0 else 0)
elapsed = time.time() - start_time
with _injection_lock:
_injection_state['injected'] = min(i + 1, len(routes))
_injection_state['elapsed_sec'] = round(elapsed, 1)
_injection_state['rate_pps'] = round(_injection_state['injected'] / elapsed, 1) if elapsed > 0 else 0
_injection_state['active'] = False
log.info('Injection complete: %d routes in %.1fs (%.0f/s)',
_injection_state['injected'], elapsed,
_injection_state['injected'] / elapsed if elapsed > 0 else 0)
except Exception as e:
log.error('Injection error: %s', e)
with _injection_lock:
_injection_state['error'] = str(e)
_injection_state['active'] = False
finally:
_quiet_mode = False
@app.route('/full-table/start', methods=['POST'])
def start_full_table():
"""Start background injection of a full IPv4 routing table.
POST body (all optional):
count: Number of prefixes (default 900000)
batch_size: Progress update interval (default 1000)
"""
with _injection_lock:
if _injection_state['active']:
return jsonify({
'error': 'Injection already in progress',
'state': dict(_injection_state),
}), 409
data = request.get_json(force=True) if request.data else {}
count = int(data.get('count', 900000))
batch_size = int(data.get('batch_size', 1000))
with _injection_lock:
_injection_state.update({
'active': True,
'total': count,
'injected': 0,
'elapsed_sec': 0,
'rate_pps': 0,
'error': None,
'stop_requested': False,
})
t = threading.Thread(target=_injection_worker, args=(count, batch_size), daemon=True)
t.start()
log.info('Started full-table injection: %d prefixes', count)
return jsonify({
'status': 'started',
'count': count,
'message': f'Generating and injecting {count} prefixes in background. GET /full-table/status to track progress.',
})
@app.route('/full-table/status', methods=['GET'])
def full_table_status():
"""Get current full-table injection progress."""
with _injection_lock:
state = dict(_injection_state)
if state['total'] > 0:
state['progress_pct'] = round(state['injected'] / state['total'] * 100, 1)
else:
state['progress_pct'] = 0
state['active_routes'] = len(active_routes)
return jsonify(state)
@app.route('/full-table/stop', methods=['POST'])
def stop_full_table():
"""Stop an in-progress full-table injection."""
with _injection_lock:
if not _injection_state['active']:
return jsonify({'error': 'No injection in progress'}), 400
_injection_state['stop_requested'] = True
return jsonify({'status': 'stop_requested', 'injected_so_far': _injection_state['injected']})
# ---------------------------------------------------------------------------
# ExaBGP event loop (main thread)
# ---------------------------------------------------------------------------
def parse_exabgp_event(line: str):
"""Parse ExaBGP neighbor state events from stdin."""
# ExaBGP text events look like:
# neighbor 10.100.0.100 up
# neighbor 10.100.0.100 down
parts = line.strip().split()
if len(parts) >= 3 and parts[0] == 'neighbor':
peer_ip = parts[1]
state = parts[2]
peer_states[peer_ip] = {'state': state, 'updated': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
log.info('Peer %s%s', peer_ip, state)
def main():
api_port = int(os.environ.get('EXABGP_API_PORT', 5050))
# Start Flask in background thread
flask_thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=api_port, threaded=True),
daemon=True,
)
flask_thread.start()
log.info('Flask API listening on port %d', api_port)
# Main thread: read ExaBGP events from stdin
log.info('Waiting for ExaBGP events on stdin...')
while True:
try:
line = sys.stdin.readline()
if not line:
log.warning('ExaBGP stdin closed, exiting')
break
line = line.strip()
if line:
log.debug('← ExaBGP: %s', line)
parse_exabgp_event(line)
except KeyboardInterrupt:
break
except Exception as e:
log.error('Event loop error: %s', e)
if __name__ == '__main__':
main()