224 lines
7.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""obmp-evpn-consumer — OpenBMP EVPN -> PostgreSQL (roadmap E5).
Subscribes to the Kafka topic `openbmp.parsed.evpn` (the OpenBMP collector
already decodes EVPN and publishes it there) and writes BGP EVPN routes into
the `evpn_rib` table. The stock openbmp/psql-app never consumes this topic;
this process fills that gap.
Field positions are pinned to the collector 2.2.3 / message-bus v1.7 layout,
verified off the live topic. The collector parses EVPN type-2 (MAC/IP) and
type-3 (inclusive multicast) cleanly; type-5 (IP-prefix) is mis-decoded
upstream and is not relied on here.
"""
import os
import sys
import time
import psycopg2
from psycopg2.extras import execute_values
from confluent_kafka import Consumer, KafkaException
KAFKA_BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092")
TOPIC = os.environ.get("EVPN_TOPIC", "openbmp.parsed.evpn")
GROUP_ID = os.environ.get("EVPN_GROUP", "evpn-psql")
PG_DSN = os.environ.get(
"PG_DSN", "host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp"
)
BATCH_SECONDS = 2.0
# 0-indexed field positions in a parsed EVPN data row (collector 2.2.3, v1.7).
F_ACTION, F_HASH = 0, 2
F_BASE_ATTR, F_PEER_HASH = 5, 6
F_TIMESTAMP = 9
F_ORIGIN_AS = 13
F_EXT_COMM = 19
F_PATH_ID = 24
F_RD, F_RD_TYPE = 27, 28
F_ORIG_RTR_IP = 30
F_ETH_TAG, F_ESI = 31, 32
F_MAC_LEN, F_MAC = 33, 34
F_IP_LEN, F_IP = 35, 36
F_LABEL1, F_LABEL2 = 37, 38
MIN_FIELDS = 39
def log(msg):
print(f"[{time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}] {msg}", flush=True)
def nz(s):
s = (s or "").strip()
return s or None
def to_int(s):
s = nz(s)
if s is None:
return None
try:
return int(s)
except ValueError:
return None
def hex_to_int(s):
s = nz(s)
if s is None:
return None
try:
return int(s, 16)
except ValueError:
return None
def parse_rts(field):
"""The ext-community field looks like 'rt=65010:100 encap=8' — keep the RTs."""
rts = [t[3:] for t in (field or "").split() if t.startswith("rt=")]
return rts or None
def derive_route_type(mac, orig_rtr_ip):
if mac:
return 2 # MAC/IP advertisement
if orig_rtr_ip:
return 3 # inclusive multicast
return 5 # IP-prefix
def parse_message(raw):
"""OpenBMP message: 'K: V' header lines, a blank line, then R tab-sep rows."""
text = raw.decode("utf-8", errors="replace")
if "\n\n" not in text:
return []
_, body = text.split("\n\n", 1)
return [ln.split("\t") for ln in body.splitlines() if "\t" in ln]
def row_to_record(r):
if len(r) < MIN_FIELDS:
return None
mac = nz(r[F_MAC])
orig_rtr_ip = nz(r[F_ORIG_RTR_IP])
return {
"action": r[F_ACTION].strip().lower(),
"hash_id": r[F_HASH].strip(),
"peer_hash_id": r[F_PEER_HASH].strip(),
"base_attr_hash_id": nz(r[F_BASE_ATTR]),
"rd": r[F_RD].strip() or "0:0",
"rd_type": to_int(r[F_RD_TYPE]),
"route_type": derive_route_type(mac, orig_rtr_ip),
"origin_as": to_int(r[F_ORIGIN_AS]),
"eth_segment_id": nz(r[F_ESI]),
"eth_tag_id": hex_to_int(r[F_ETH_TAG]),
"mac": mac,
"mac_len": to_int(r[F_MAC_LEN]),
"ip": nz(r[F_IP]),
"ip_len": to_int(r[F_IP_LEN]),
"orig_router_ip": orig_rtr_ip,
"mpls_label1": to_int(r[F_LABEL1]),
"mpls_label2": to_int(r[F_LABEL2]),
"ext_community_list": parse_rts(r[F_EXT_COMM]),
"path_id": to_int(r[F_PATH_ID]),
"timestamp": nz(r[F_TIMESTAMP]),
}
INSERT_COLS = (
"hash_id", "peer_hash_id", "base_attr_hash_id", "rd", "rd_type", "route_type",
"origin_as", "eth_segment_id", "eth_tag_id", "mac", "mac_len", "ip", "ip_len",
"orig_router_ip", "mpls_label1", "mpls_label2", "ext_community_list", "path_id",
"timestamp",
)
INSERT_SQL = f"""
INSERT INTO evpn_rib ({", ".join(INSERT_COLS)}, iswithdrawn)
VALUES %s
ON CONFLICT (peer_hash_id, hash_id) DO UPDATE SET
base_attr_hash_id = EXCLUDED.base_attr_hash_id, rd = EXCLUDED.rd,
rd_type = EXCLUDED.rd_type, route_type = EXCLUDED.route_type,
origin_as = EXCLUDED.origin_as, eth_segment_id = EXCLUDED.eth_segment_id,
eth_tag_id = EXCLUDED.eth_tag_id, mac = EXCLUDED.mac, mac_len = EXCLUDED.mac_len,
ip = EXCLUDED.ip, ip_len = EXCLUDED.ip_len,
orig_router_ip = EXCLUDED.orig_router_ip, mpls_label1 = EXCLUDED.mpls_label1,
mpls_label2 = EXCLUDED.mpls_label2, ext_community_list = EXCLUDED.ext_community_list,
path_id = EXCLUDED.path_id, timestamp = EXCLUDED.timestamp, iswithdrawn = false
"""
DELETE_SQL = """
UPDATE evpn_rib SET iswithdrawn = true, base_attr_hash_id = NULL, timestamp = %s
WHERE peer_hash_id = %s AND hash_id = %s
"""
def flush(conn, adds, dels):
if not adds and not dels:
return
with conn.cursor() as cur:
if adds:
tuples = [
tuple(rec[c] for c in INSERT_COLS) + (False,) for rec in adds
]
execute_values(cur, INSERT_SQL, tuples)
for rec in dels:
cur.execute(DELETE_SQL, (rec["timestamp"], rec["peer_hash_id"], rec["hash_id"]))
conn.commit()
log(f"flushed {len(adds)} add/update, {len(dels)} withdraw")
def connect_pg():
while True:
try:
conn = psycopg2.connect(PG_DSN)
conn.autocommit = False
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM evpn_rib LIMIT 0")
log("connected to PostgreSQL; evpn_rib present")
return conn
except psycopg2.Error as e:
log(f"PostgreSQL not ready ({e}); retrying in 5s")
time.sleep(5)
def main():
log(f"starting — kafka={KAFKA_BROKER} topic={TOPIC} group={GROUP_ID}")
conn = connect_pg()
consumer = Consumer({
"bootstrap.servers": KAFKA_BROKER,
"group.id": GROUP_ID,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe([TOPIC])
adds, dels = [], []
last_flush = time.time()
try:
while True:
msg = consumer.poll(1.0)
if msg is not None and not msg.error():
for row in parse_message(msg.value()):
rec = row_to_record(row)
if rec is None:
continue
(dels if rec["action"] == "del" else adds).append(rec)
elif msg is not None and msg.error():
raise KafkaException(msg.error())
if (adds or dels) and time.time() - last_flush >= BATCH_SECONDS:
try:
flush(conn, adds, dels)
except psycopg2.Error as e:
log(f"DB write failed ({e}); reconnecting")
conn = connect_pg()
continue
consumer.commit(asynchronous=False)
adds, dels = [], []
last_flush = time.time()
except KeyboardInterrupt:
log("shutting down")
finally:
consumer.close()
conn.close()
if __name__ == "__main__":
sys.exit(main())