#!/usr/bin/env python3 """obmp-evpn-consumer — OpenBMP EVPN -> PostgreSQL (roadmap E5). Subscribes to the Kafka topic `openbmp.parsed.evpn` (the OpenBMP collector already decodes EVPN and publishes it there) and writes BGP EVPN routes into the `evpn_rib` table. The stock openbmp/psql-app never consumes this topic; this process fills that gap. Field positions are pinned to the collector 2.2.3 / message-bus v1.7 layout, verified off the live topic. The collector parses EVPN type-2 (MAC/IP) and type-3 (inclusive multicast) cleanly; type-5 (IP-prefix) is mis-decoded upstream and is not relied on here. """ import os import sys import time import psycopg2 from psycopg2.extras import execute_values from confluent_kafka import Consumer, KafkaException KAFKA_BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092") TOPIC = os.environ.get("EVPN_TOPIC", "openbmp.parsed.evpn") GROUP_ID = os.environ.get("EVPN_GROUP", "evpn-psql") PG_DSN = os.environ.get( "PG_DSN", "host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp" ) BATCH_SECONDS = 2.0 # 0-indexed field positions in a parsed EVPN data row (collector 2.2.3, v1.7). F_ACTION, F_HASH = 0, 2 F_BASE_ATTR, F_PEER_HASH = 5, 6 F_TIMESTAMP = 9 F_ORIGIN_AS = 13 F_EXT_COMM = 19 F_PATH_ID = 24 F_RD, F_RD_TYPE = 27, 28 F_ORIG_RTR_IP = 30 F_ETH_TAG, F_ESI = 31, 32 F_MAC_LEN, F_MAC = 33, 34 F_IP_LEN, F_IP = 35, 36 F_LABEL1, F_LABEL2 = 37, 38 MIN_FIELDS = 39 def log(msg): print(f"[{time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}] {msg}", flush=True) def nz(s): s = (s or "").strip() return s or None def to_int(s): s = nz(s) if s is None: return None try: return int(s) except ValueError: return None def hex_to_int(s): s = nz(s) if s is None: return None try: return int(s, 16) except ValueError: return None def parse_rts(field): """The ext-community field looks like 'rt=65010:100 encap=8' — keep the RTs.""" rts = [t[3:] for t in (field or "").split() if t.startswith("rt=")] return rts or None def derive_route_type(mac, orig_rtr_ip): if mac: return 2 # MAC/IP advertisement if orig_rtr_ip: return 3 # inclusive multicast return 5 # IP-prefix def parse_message(raw): """OpenBMP message: 'K: V' header lines, a blank line, then R tab-sep rows.""" text = raw.decode("utf-8", errors="replace") if "\n\n" not in text: return [] _, body = text.split("\n\n", 1) return [ln.split("\t") for ln in body.splitlines() if "\t" in ln] def row_to_record(r): if len(r) < MIN_FIELDS: return None mac = nz(r[F_MAC]) orig_rtr_ip = nz(r[F_ORIG_RTR_IP]) return { "action": r[F_ACTION].strip().lower(), "hash_id": r[F_HASH].strip(), "peer_hash_id": r[F_PEER_HASH].strip(), "base_attr_hash_id": nz(r[F_BASE_ATTR]), "rd": r[F_RD].strip() or "0:0", "rd_type": to_int(r[F_RD_TYPE]), "route_type": derive_route_type(mac, orig_rtr_ip), "origin_as": to_int(r[F_ORIGIN_AS]), "eth_segment_id": nz(r[F_ESI]), "eth_tag_id": hex_to_int(r[F_ETH_TAG]), "mac": mac, "mac_len": to_int(r[F_MAC_LEN]), "ip": nz(r[F_IP]), "ip_len": to_int(r[F_IP_LEN]), "orig_router_ip": orig_rtr_ip, "mpls_label1": to_int(r[F_LABEL1]), "mpls_label2": to_int(r[F_LABEL2]), "ext_community_list": parse_rts(r[F_EXT_COMM]), "path_id": to_int(r[F_PATH_ID]), "timestamp": nz(r[F_TIMESTAMP]), } INSERT_COLS = ( "hash_id", "peer_hash_id", "base_attr_hash_id", "rd", "rd_type", "route_type", "origin_as", "eth_segment_id", "eth_tag_id", "mac", "mac_len", "ip", "ip_len", "orig_router_ip", "mpls_label1", "mpls_label2", "ext_community_list", "path_id", "timestamp", ) INSERT_SQL = f""" INSERT INTO evpn_rib ({", ".join(INSERT_COLS)}, iswithdrawn) VALUES %s ON CONFLICT (peer_hash_id, hash_id) DO UPDATE SET base_attr_hash_id = EXCLUDED.base_attr_hash_id, rd = EXCLUDED.rd, rd_type = EXCLUDED.rd_type, route_type = EXCLUDED.route_type, origin_as = EXCLUDED.origin_as, eth_segment_id = EXCLUDED.eth_segment_id, eth_tag_id = EXCLUDED.eth_tag_id, mac = EXCLUDED.mac, mac_len = EXCLUDED.mac_len, ip = EXCLUDED.ip, ip_len = EXCLUDED.ip_len, orig_router_ip = EXCLUDED.orig_router_ip, mpls_label1 = EXCLUDED.mpls_label1, mpls_label2 = EXCLUDED.mpls_label2, ext_community_list = EXCLUDED.ext_community_list, path_id = EXCLUDED.path_id, timestamp = EXCLUDED.timestamp, iswithdrawn = false """ DELETE_SQL = """ UPDATE evpn_rib SET iswithdrawn = true, base_attr_hash_id = NULL, timestamp = %s WHERE peer_hash_id = %s AND hash_id = %s """ def flush(conn, adds, dels): if not adds and not dels: return with conn.cursor() as cur: if adds: tuples = [ tuple(rec[c] for c in INSERT_COLS) + (False,) for rec in adds ] execute_values(cur, INSERT_SQL, tuples) for rec in dels: cur.execute(DELETE_SQL, (rec["timestamp"], rec["peer_hash_id"], rec["hash_id"])) conn.commit() log(f"flushed {len(adds)} add/update, {len(dels)} withdraw") def connect_pg(): while True: try: conn = psycopg2.connect(PG_DSN) conn.autocommit = False with conn.cursor() as cur: cur.execute("SELECT 1 FROM evpn_rib LIMIT 0") log("connected to PostgreSQL; evpn_rib present") return conn except psycopg2.Error as e: log(f"PostgreSQL not ready ({e}); retrying in 5s") time.sleep(5) def main(): log(f"starting — kafka={KAFKA_BROKER} topic={TOPIC} group={GROUP_ID}") conn = connect_pg() consumer = Consumer({ "bootstrap.servers": KAFKA_BROKER, "group.id": GROUP_ID, "auto.offset.reset": "earliest", "enable.auto.commit": False, }) consumer.subscribe([TOPIC]) adds, dels = [], [] last_flush = time.time() try: while True: msg = consumer.poll(1.0) if msg is not None and not msg.error(): for row in parse_message(msg.value()): rec = row_to_record(row) if rec is None: continue (dels if rec["action"] == "del" else adds).append(rec) elif msg is not None and msg.error(): raise KafkaException(msg.error()) if (adds or dels) and time.time() - last_flush >= BATCH_SECONDS: try: flush(conn, adds, dels) except psycopg2.Error as e: log(f"DB write failed ({e}); reconnecting") conn = connect_pg() continue consumer.commit(asynchronous=False) adds, dels = [], [] last_flush = time.time() except KeyboardInterrupt: log("shutting down") finally: consumer.close() conn.close() if __name__ == "__main__": sys.exit(main())