From 107cbf6ac5505c88b4dfc8391e14471adf4970d0 Mon Sep 17 00:00:00 2001 From: sam Date: Tue, 19 May 2026 09:28:19 -0700 Subject: [PATCH] Add obmp-evpn-consumer: openbmp.parsed.evpn -> evpn_rib (roadmap E5) A standalone Python Kafka consumer that subscribes to the openbmp.parsed.evpn topic (which the stock psql-app ignores) and writes BGP EVPN routes into evpn_rib. Field positions are pinned to the verified collector 2.2.3 / v1.7 message layout; route_type is derived from which fields populate. Profile-gated ('evpn-test') alongside the gobgp-evpn injector. Verified end to end: 5 injected type-2/type-3 routes land in evpn_rib with correct RD, ethernet-tag, MAC, IP, label and route-target. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker-compose.yml | 18 +++ obmp-evpn-consumer/Dockerfile | 8 + obmp-evpn-consumer/consumer.py | 223 ++++++++++++++++++++++++++++ obmp-evpn-consumer/requirements.txt | 2 + 4 files changed, 251 insertions(+) create mode 100644 obmp-evpn-consumer/Dockerfile create mode 100644 obmp-evpn-consumer/consumer.py create mode 100644 obmp-evpn-consumer/requirements.txt diff --git a/docker-compose.yml b/docker-compose.yml index e42c977..874c888 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -466,6 +466,24 @@ services: - ./gobgp-evpn:/config command: ["gobgpd", "-f", "/config/gobgpd.conf", "-t", "toml"] + # EVPN consumer -- subscribes to the openbmp.parsed.evpn Kafka topic (which + # the collector already populates) and writes BGP EVPN routes into evpn_rib; + # the stock psql-app does not handle EVPN. Profile-gated alongside the EVPN + # test injector: docker compose --profile evpn-test up -d + evpn-consumer: + restart: unless-stopped + container_name: obmp-evpn-consumer + profiles: ["evpn-test"] + build: + context: ./obmp-evpn-consumer + depends_on: + - kafka + - psql + environment: + - KAFKA_BROKER=obmp-kafka:29092 + - EVPN_TOPIC=openbmp.parsed.evpn + - PG_DSN=host=obmp-psql port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp} + whois: restart: unless-stopped container_name: obmp-whois diff --git a/obmp-evpn-consumer/Dockerfile b/obmp-evpn-consumer/Dockerfile new file mode 100644 index 0000000..21a16fc --- /dev/null +++ b/obmp-evpn-consumer/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.12-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY consumer.py . + +CMD ["python", "-u", "consumer.py"] diff --git a/obmp-evpn-consumer/consumer.py b/obmp-evpn-consumer/consumer.py new file mode 100644 index 0000000..558ca6a --- /dev/null +++ b/obmp-evpn-consumer/consumer.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +"""obmp-evpn-consumer — OpenBMP EVPN -> PostgreSQL (roadmap E5). + +Subscribes to the Kafka topic `openbmp.parsed.evpn` (the OpenBMP collector +already decodes EVPN and publishes it there) and writes BGP EVPN routes into +the `evpn_rib` table. The stock openbmp/psql-app never consumes this topic; +this process fills that gap. + +Field positions are pinned to the collector 2.2.3 / message-bus v1.7 layout, +verified off the live topic. The collector parses EVPN type-2 (MAC/IP) and +type-3 (inclusive multicast) cleanly; type-5 (IP-prefix) is mis-decoded +upstream and is not relied on here. +""" +import os +import sys +import time +import psycopg2 +from psycopg2.extras import execute_values +from confluent_kafka import Consumer, KafkaException + +KAFKA_BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092") +TOPIC = os.environ.get("EVPN_TOPIC", "openbmp.parsed.evpn") +GROUP_ID = os.environ.get("EVPN_GROUP", "evpn-psql") +PG_DSN = os.environ.get( + "PG_DSN", "host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp" +) +BATCH_SECONDS = 2.0 + +# 0-indexed field positions in a parsed EVPN data row (collector 2.2.3, v1.7). +F_ACTION, F_HASH = 0, 2 +F_BASE_ATTR, F_PEER_HASH = 5, 6 +F_TIMESTAMP = 9 +F_ORIGIN_AS = 13 +F_EXT_COMM = 19 +F_PATH_ID = 24 +F_RD, F_RD_TYPE = 27, 28 +F_ORIG_RTR_IP = 30 +F_ETH_TAG, F_ESI = 31, 32 +F_MAC_LEN, F_MAC = 33, 34 +F_IP_LEN, F_IP = 35, 36 +F_LABEL1, F_LABEL2 = 37, 38 +MIN_FIELDS = 39 + + +def log(msg): + print(f"[{time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}] {msg}", flush=True) + + +def nz(s): + s = (s or "").strip() + return s or None + + +def to_int(s): + s = nz(s) + if s is None: + return None + try: + return int(s) + except ValueError: + return None + + +def hex_to_int(s): + s = nz(s) + if s is None: + return None + try: + return int(s, 16) + except ValueError: + return None + + +def parse_rts(field): + """The ext-community field looks like 'rt=65010:100 encap=8' — keep the RTs.""" + rts = [t[3:] for t in (field or "").split() if t.startswith("rt=")] + return rts or None + + +def derive_route_type(mac, orig_rtr_ip): + if mac: + return 2 # MAC/IP advertisement + if orig_rtr_ip: + return 3 # inclusive multicast + return 5 # IP-prefix + + +def parse_message(raw): + """OpenBMP message: 'K: V' header lines, a blank line, then R tab-sep rows.""" + text = raw.decode("utf-8", errors="replace") + if "\n\n" not in text: + return [] + _, body = text.split("\n\n", 1) + return [ln.split("\t") for ln in body.splitlines() if "\t" in ln] + + +def row_to_record(r): + if len(r) < MIN_FIELDS: + return None + mac = nz(r[F_MAC]) + orig_rtr_ip = nz(r[F_ORIG_RTR_IP]) + return { + "action": r[F_ACTION].strip().lower(), + "hash_id": r[F_HASH].strip(), + "peer_hash_id": r[F_PEER_HASH].strip(), + "base_attr_hash_id": nz(r[F_BASE_ATTR]), + "rd": r[F_RD].strip() or "0:0", + "rd_type": to_int(r[F_RD_TYPE]), + "route_type": derive_route_type(mac, orig_rtr_ip), + "origin_as": to_int(r[F_ORIGIN_AS]), + "eth_segment_id": nz(r[F_ESI]), + "eth_tag_id": hex_to_int(r[F_ETH_TAG]), + "mac": mac, + "mac_len": to_int(r[F_MAC_LEN]), + "ip": nz(r[F_IP]), + "ip_len": to_int(r[F_IP_LEN]), + "orig_router_ip": orig_rtr_ip, + "mpls_label1": to_int(r[F_LABEL1]), + "mpls_label2": to_int(r[F_LABEL2]), + "ext_community_list": parse_rts(r[F_EXT_COMM]), + "path_id": to_int(r[F_PATH_ID]), + "timestamp": nz(r[F_TIMESTAMP]), + } + + +INSERT_COLS = ( + "hash_id", "peer_hash_id", "base_attr_hash_id", "rd", "rd_type", "route_type", + "origin_as", "eth_segment_id", "eth_tag_id", "mac", "mac_len", "ip", "ip_len", + "orig_router_ip", "mpls_label1", "mpls_label2", "ext_community_list", "path_id", + "timestamp", +) +INSERT_SQL = f""" +INSERT INTO evpn_rib ({", ".join(INSERT_COLS)}, iswithdrawn) +VALUES %s +ON CONFLICT (peer_hash_id, hash_id) DO UPDATE SET + base_attr_hash_id = EXCLUDED.base_attr_hash_id, rd = EXCLUDED.rd, + rd_type = EXCLUDED.rd_type, route_type = EXCLUDED.route_type, + origin_as = EXCLUDED.origin_as, eth_segment_id = EXCLUDED.eth_segment_id, + eth_tag_id = EXCLUDED.eth_tag_id, mac = EXCLUDED.mac, mac_len = EXCLUDED.mac_len, + ip = EXCLUDED.ip, ip_len = EXCLUDED.ip_len, + orig_router_ip = EXCLUDED.orig_router_ip, mpls_label1 = EXCLUDED.mpls_label1, + mpls_label2 = EXCLUDED.mpls_label2, ext_community_list = EXCLUDED.ext_community_list, + path_id = EXCLUDED.path_id, timestamp = EXCLUDED.timestamp, iswithdrawn = false +""" +DELETE_SQL = """ +UPDATE evpn_rib SET iswithdrawn = true, base_attr_hash_id = NULL, timestamp = %s +WHERE peer_hash_id = %s AND hash_id = %s +""" + + +def flush(conn, adds, dels): + if not adds and not dels: + return + with conn.cursor() as cur: + if adds: + tuples = [ + tuple(rec[c] for c in INSERT_COLS) + (False,) for rec in adds + ] + execute_values(cur, INSERT_SQL, tuples) + for rec in dels: + cur.execute(DELETE_SQL, (rec["timestamp"], rec["peer_hash_id"], rec["hash_id"])) + conn.commit() + log(f"flushed {len(adds)} add/update, {len(dels)} withdraw") + + +def connect_pg(): + while True: + try: + conn = psycopg2.connect(PG_DSN) + conn.autocommit = False + with conn.cursor() as cur: + cur.execute("SELECT 1 FROM evpn_rib LIMIT 0") + log("connected to PostgreSQL; evpn_rib present") + return conn + except psycopg2.Error as e: + log(f"PostgreSQL not ready ({e}); retrying in 5s") + time.sleep(5) + + +def main(): + log(f"starting — kafka={KAFKA_BROKER} topic={TOPIC} group={GROUP_ID}") + conn = connect_pg() + consumer = Consumer({ + "bootstrap.servers": KAFKA_BROKER, + "group.id": GROUP_ID, + "auto.offset.reset": "earliest", + "enable.auto.commit": False, + }) + consumer.subscribe([TOPIC]) + + adds, dels = [], [] + last_flush = time.time() + try: + while True: + msg = consumer.poll(1.0) + if msg is not None and not msg.error(): + for row in parse_message(msg.value()): + rec = row_to_record(row) + if rec is None: + continue + (dels if rec["action"] == "del" else adds).append(rec) + elif msg is not None and msg.error(): + raise KafkaException(msg.error()) + + if (adds or dels) and time.time() - last_flush >= BATCH_SECONDS: + try: + flush(conn, adds, dels) + except psycopg2.Error as e: + log(f"DB write failed ({e}); reconnecting") + conn = connect_pg() + continue + consumer.commit(asynchronous=False) + adds, dels = [], [] + last_flush = time.time() + except KeyboardInterrupt: + log("shutting down") + finally: + consumer.close() + conn.close() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/obmp-evpn-consumer/requirements.txt b/obmp-evpn-consumer/requirements.txt new file mode 100644 index 0000000..6c0eb7b --- /dev/null +++ b/obmp-evpn-consumer/requirements.txt @@ -0,0 +1,2 @@ +confluent-kafka==2.5.3 +psycopg2-binary==2.9.9