obmp-docker/exabgp/api/server.py

274 lines
8.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
ExaBGP Route Injection API Server
Runs as an ExaBGP process reads BGP events from stdin, writes BGP
commands to stdout. Flask API runs in a background thread so HTTP
requests trigger live route announcements/withdrawals.
API endpoints:
GET /healthz - health + active route count
GET /routes - list all active routes
POST /announce - announce prefixes
POST /withdraw - withdraw prefixes
POST /withdraw/all - withdraw everything
GET /scenarios - list available scenarios
POST /scenario/<name> - load a scenario
DELETE /scenario/<name> - withdraw a scenario
GET /peers - BGP peer state
"""
import sys
import os
import json
import threading
import time
import logging
from flask import Flask, request, jsonify
# --- logging to stderr so it doesn't pollute ExaBGP's stdout pipe ---
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format='%(asctime)s [API] %(levelname)s %(message)s',
)
log = logging.getLogger(__name__)
app = Flask(__name__)
# Thread-safe stdout for ExaBGP commands
_stdout_lock = threading.Lock()
# Active routes: prefix -> metadata dict
active_routes = {}
# Peer state received from ExaBGP events
peer_states = {}
# ---------------------------------------------------------------------------
# ExaBGP command helpers
# ---------------------------------------------------------------------------
def _send(cmd: str):
"""Write a command to ExaBGP via stdout."""
with _stdout_lock:
sys.stdout.write(cmd + '\n')
sys.stdout.flush()
log.info('→ ExaBGP: %s', cmd)
def _build_announce(prefix, next_hop='self', as_path=None, communities=None, med=None, local_pref=None):
parts = [f'announce route {prefix} next-hop {next_hop}']
if as_path:
parts.append(f'as-path [ {" ".join(str(a) for a in as_path)} ]')
if communities:
parts.append(f'community [ {" ".join(communities)} ]')
if med is not None:
parts.append(f'med {med}')
if local_pref is not None:
parts.append(f'local-preference {local_pref}')
return ' '.join(parts)
def _build_withdraw(prefix, next_hop='self'):
return f'withdraw route {prefix} next-hop {next_hop}'
def announce_route(prefix, next_hop='self', as_path=None, communities=None, med=None, local_pref=None):
cmd = _build_announce(prefix, next_hop, as_path, communities, med, local_pref)
_send(cmd)
active_routes[prefix] = {
'prefix': prefix,
'next_hop': next_hop,
'as_path': as_path or [],
'communities': communities or [],
'med': med,
'local_pref': local_pref,
'announced_at': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
}
def withdraw_route(prefix, next_hop='self'):
_send(_build_withdraw(prefix, next_hop))
active_routes.pop(prefix, None)
# ---------------------------------------------------------------------------
# Flask API routes
# ---------------------------------------------------------------------------
@app.route('/healthz')
def health():
return jsonify({
'status': 'ok',
'active_routes': len(active_routes),
'peers': peer_states,
})
@app.route('/routes', methods=['GET'])
def list_routes():
return jsonify({
'count': len(active_routes),
'routes': list(active_routes.values()),
})
@app.route('/announce', methods=['POST'])
def api_announce():
data = request.get_json(force=True)
prefixes = data.get('prefixes', [])
if not prefixes:
return jsonify({'error': 'prefixes list required'}), 400
next_hop = data.get('next_hop', 'self')
as_path = data.get('as_path', [])
communities = data.get('communities', [])
med = data.get('med')
local_pref = data.get('local_pref')
announced = []
for prefix in prefixes:
announce_route(prefix, next_hop, as_path, communities, med, local_pref)
announced.append(prefix)
return jsonify({'announced': announced, 'count': len(announced)})
@app.route('/withdraw', methods=['POST'])
def api_withdraw():
data = request.get_json(force=True)
prefixes = data.get('prefixes', [])
if not prefixes:
return jsonify({'error': 'prefixes list required'}), 400
withdrawn = []
for prefix in prefixes:
withdraw_route(prefix)
withdrawn.append(prefix)
return jsonify({'withdrawn': withdrawn, 'count': len(withdrawn)})
@app.route('/withdraw/all', methods=['POST'])
def api_withdraw_all():
prefixes = list(active_routes.keys())
for prefix in prefixes:
withdraw_route(prefix)
return jsonify({'withdrawn': prefixes, 'count': len(prefixes)})
# ---------------------------------------------------------------------------
# Scenario management
# ---------------------------------------------------------------------------
sys.path.insert(0, '/exabgp')
from scenarios import SCENARIOS
@app.route('/scenarios', methods=['GET'])
def list_scenarios():
return jsonify({
'scenarios': {
name: {
'description': s.get('description', ''),
'route_count': len(s.get('routes', [])),
}
for name, s in SCENARIOS.items()
}
})
@app.route('/scenario/<name>', methods=['POST'])
def load_scenario(name):
if name not in SCENARIOS:
return jsonify({'error': f'Unknown scenario: {name}. Available: {list(SCENARIOS)}'}), 404
scenario = SCENARIOS[name]
announced = []
for route in scenario['routes']:
prefix = route['prefix']
announce_route(
prefix,
next_hop=route.get('next_hop', 'self'),
as_path=route.get('as_path', []),
communities=route.get('communities', []),
med=route.get('med'),
local_pref=route.get('local_pref'),
)
announced.append(prefix)
log.info('Loaded scenario %s: %d routes', name, len(announced))
return jsonify({'scenario': name, 'announced': announced, 'count': len(announced)})
@app.route('/scenario/<name>', methods=['DELETE'])
def unload_scenario(name):
if name not in SCENARIOS:
return jsonify({'error': f'Unknown scenario: {name}'}), 404
scenario = SCENARIOS[name]
withdrawn = []
for route in scenario['routes']:
prefix = route['prefix']
if prefix in active_routes:
withdraw_route(prefix)
withdrawn.append(prefix)
log.info('Unloaded scenario %s: %d routes', name, len(withdrawn))
return jsonify({'scenario': name, 'withdrawn': withdrawn, 'count': len(withdrawn)})
@app.route('/peers', methods=['GET'])
def get_peers():
return jsonify({'peers': peer_states})
# ---------------------------------------------------------------------------
# ExaBGP event loop (main thread)
# ---------------------------------------------------------------------------
def parse_exabgp_event(line: str):
"""Parse ExaBGP neighbor state events from stdin."""
# ExaBGP text events look like:
# neighbor 10.100.0.100 up
# neighbor 10.100.0.100 down
parts = line.strip().split()
if len(parts) >= 3 and parts[0] == 'neighbor':
peer_ip = parts[1]
state = parts[2]
peer_states[peer_ip] = {'state': state, 'updated': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
log.info('Peer %s%s', peer_ip, state)
def main():
api_port = int(os.environ.get('EXABGP_API_PORT', 5050))
# Start Flask in background thread
flask_thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=api_port, threaded=True),
daemon=True,
)
flask_thread.start()
log.info('Flask API listening on port %d', api_port)
# Main thread: read ExaBGP events from stdin
log.info('Waiting for ExaBGP events on stdin...')
while True:
try:
line = sys.stdin.readline()
if not line:
log.warning('ExaBGP stdin closed, exiting')
break
line = line.strip()
if line:
log.debug('← ExaBGP: %s', line)
parse_exabgp_event(line)
except KeyboardInterrupt:
break
except Exception as e:
log.error('Event loop error: %s', e)
if __name__ == '__main__':
main()