From d7084aba54f14289fdfcbb4ab09e0eadcf11e700 Mon Sep 17 00:00:00 2001 From: sam Date: Tue, 19 May 2026 13:17:09 -0700 Subject: [PATCH] Add fast-path churn monitor and churn-storm load tool obmp-churn-monitor: a decoupled fast-path BGP churn consumer. Reads openbmp.parsed.unicast_prefix with its own Kafka consumer group and only counts announcements/withdrawals per (router,peer) into churn_metrics (010_churn_metrics.sql) -- no relational RIB write. Storm-tested: it stayed real-time (tracked 1k->85k msg/s) while the psql-app bulk pipeline lag grew 3.8M->5.6M. Live BGP Churn dashboard reads it. tools/churn_storm.py: programmatic churn-storm generator (flaps GoBGP's eBGP sessions to the lab cores) for load testing. Stress-test finding: fleet-wide full table from 18 routers exceeds this 31 GiB host. The bottleneck is RAM, not CPU -- at 16 cores the host still hit load 33 because it was swap-thrashing (swap 2/2 full, <1.5 GiB free). Lag ran away 3.8M->20M+. Recourse: more host RAM for bulk throughput; the fast-path consumer for visibility regardless. Co-Authored-By: Claude Opus 4.7 --- docker-compose.yml | 18 ++++ obmp-churn-monitor/Dockerfile | 8 ++ obmp-churn-monitor/monitor.py | 102 ++++++++++++++++++ obmp-churn-monitor/requirements.txt | 2 + .../dashboards/Telemetry-3001/live_churn.json | 98 +++++++++++++++++ postgres/scripts/010_churn_metrics.sql | 30 ++++++ tools/churn_storm.py | 64 +++++++++++ 7 files changed, 322 insertions(+) create mode 100644 obmp-churn-monitor/Dockerfile create mode 100644 obmp-churn-monitor/monitor.py create mode 100644 obmp-churn-monitor/requirements.txt create mode 100644 obmp-grafana/dashboards/Telemetry-3001/live_churn.json create mode 100644 postgres/scripts/010_churn_metrics.sql create mode 100644 tools/churn_storm.py diff --git a/docker-compose.yml b/docker-compose.yml index 3763a64..eb15ee2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -520,6 +520,24 @@ services: - LAG_POLL_INTERVAL=30 - CONSUMER_GROUPS=obmp-psql-consumer,evpn-psql + # Decoupled fast-path BGP churn monitor. Reads openbmp.parsed.unicast_prefix + # with its own consumer group and only counts announcements/withdrawals -- + # stays real-time during a churn storm even while psql-app lags, because + # counting is far cheaper than the relational RIB write. Featherweight. + churn-monitor: + restart: unless-stopped + container_name: obmp-churn-monitor + build: + context: ./obmp-churn-monitor + depends_on: + - kafka + - psql + environment: + - KAFKA_BROKER=obmp-kafka:29092 + - PG_DSN=host=obmp-psql port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp} + - CHURN_TOPIC=openbmp.parsed.unicast_prefix + - FLUSH_INTERVAL=10 + whois: restart: unless-stopped container_name: obmp-whois diff --git a/obmp-churn-monitor/Dockerfile b/obmp-churn-monitor/Dockerfile new file mode 100644 index 0000000..96fb4ef --- /dev/null +++ b/obmp-churn-monitor/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.12-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY monitor.py . + +CMD ["python", "-u", "monitor.py"] diff --git a/obmp-churn-monitor/monitor.py b/obmp-churn-monitor/monitor.py new file mode 100644 index 0000000..4de6750 --- /dev/null +++ b/obmp-churn-monitor/monitor.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +"""obmp-churn-monitor -- decoupled fast-path BGP churn-rate monitor. + +Reads openbmp.parsed.unicast_prefix from Kafka with its OWN consumer group and +only COUNTS announcements/withdrawals per (router, peer) -- no relational RIB +maintenance, no per-route DB upserts. Because counting is orders of magnitude +cheaper than psql-app's work, this stays real-time even when the main +ingestion pipeline lags minutes behind during a churn storm. + +It does NOT make the bulk DB write faster -- it guarantees churn *visibility* +survives a storm the bulk pipeline cannot keep up with. Aggregates flush every +FLUSH_INTERVAL seconds to churn_metrics (postgres/scripts/010_churn_metrics.sql); +the Live BGP Churn dashboard reads it. + +The consumer starts at the topic head (auto.offset.reset=latest, no commits): +on restart it jumps to current churn rather than replaying a stale backlog. +""" +import collections +import os +import time + +import psycopg2 +from confluent_kafka import Consumer + +BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092") +TOPIC = os.environ.get("CHURN_TOPIC", "openbmp.parsed.unicast_prefix") +PG_DSN = os.environ.get( + "PG_DSN", + "host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp", +) +FLUSH_INTERVAL = int(os.environ.get("FLUSH_INTERVAL", "10")) + +# Tab-separated field positions in an openbmp.parsed.unicast_prefix data row. +F_ACTION = 0 # "add" (announce) or "del" (withdraw) +F_ROUTER_IP = 4 # BMP router management IP +F_PEER_IP = 7 # BGP peer address +F_PEER_ASN = 8 # BGP peer ASN + + +def flush(counts, window): + """Write accumulated per-(router,peer) counts to churn_metrics.""" + if not counts: + return + try: + conn = psycopg2.connect(PG_DSN) + with conn.cursor() as cur: + for (router, peer, asn), (adds, dels) in counts.items(): + cur.execute( + "INSERT INTO churn_metrics (router_ip, peer_ip, peer_asn, " + "adds, dels) VALUES (%s,%s,%s,%s,%s)", + (router or None, peer or None, asn, adds, dels)) + conn.commit() + conn.close() + tot_a = sum(v[0] for v in counts.values()) + tot_d = sum(v[1] for v in counts.values()) + print(f"flush: {len(counts)} sessions, +{tot_a} -{tot_d} " + f"({(tot_a + tot_d) / max(window, 1):.0f} msg/s) over " + f"{window:.0f}s", flush=True) + except Exception as e: + print(f"flush failed: {e}", flush=True) + + +def main(): + print(f"obmp-churn-monitor: topic={TOPIC}, flush={FLUSH_INTERVAL}s", flush=True) + consumer = Consumer({ + "bootstrap.servers": BROKER, + "group.id": "obmp-churn-monitor", + "auto.offset.reset": "latest", + "enable.auto.commit": False, + }) + consumer.subscribe([TOPIC]) + + counts = collections.defaultdict(lambda: [0, 0]) # key -> [adds, dels] + last_flush = time.time() + while True: + msg = consumer.poll(1.0) + if msg is not None and not msg.error(): + text = msg.value().decode("utf-8", errors="replace") + for line in text.split("\n"): + if not (line.startswith("add\t") or line.startswith("del\t")): + continue + f = line.split("\t") + if len(f) <= F_PEER_ASN: + continue + try: + asn = int(f[F_PEER_ASN]) + except ValueError: + asn = None + key = (f[F_ROUTER_IP], f[F_PEER_IP], asn) + if f[F_ACTION] == "add": + counts[key][0] += 1 + else: + counts[key][1] += 1 + now = time.time() + if now - last_flush >= FLUSH_INTERVAL: + flush(counts, now - last_flush) + counts.clear() + last_flush = now + + +if __name__ == "__main__": + main() diff --git a/obmp-churn-monitor/requirements.txt b/obmp-churn-monitor/requirements.txt new file mode 100644 index 0000000..6c0eb7b --- /dev/null +++ b/obmp-churn-monitor/requirements.txt @@ -0,0 +1,2 @@ +confluent-kafka==2.5.3 +psycopg2-binary==2.9.9 diff --git a/obmp-grafana/dashboards/Telemetry-3001/live_churn.json b/obmp-grafana/dashboards/Telemetry-3001/live_churn.json new file mode 100644 index 0000000..5fe8828 --- /dev/null +++ b/obmp-grafana/dashboards/Telemetry-3001/live_churn.json @@ -0,0 +1,98 @@ +{ + "annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]}, + "description": "Real-time BGP churn rate from the obmp-churn-monitor fast-path consumer. This consumer reads Kafka with its own group and only counts announcements/withdrawals, so it stays current even when the main psql-app ingestion pipeline lags minutes behind during a churn storm. Use the Kafka Ingestion Lag dashboard alongside this: when lag is high, THIS dashboard is still telling you what is churning.", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [{"asDropdown": true,"icon": "external link","includeVars": true,"keepTime": true,"tags": ["obmp-nav"],"title": "OBMP Dashboards","type": "dashboards"}], + "liveNow": true, + "panels": [ + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Total churn events (announcements + withdrawals) in the last minute.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 100000},{"color": "red","value": 1000000}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 0,"y": 0}, + "id": 1, + "options": {"colorMode": "background","graphMode": "area","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT COALESCE(sum(adds + dels),0) AS \"Churn (1m)\" FROM churn_metrics WHERE ts > now() - interval '1 minute'","refId": "A"}], + "title": "Churn Events (last min)","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Route announcements in the last minute.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 6,"y": 0}, + "id": 2, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT COALESCE(sum(adds),0) AS \"Announcements\" FROM churn_metrics WHERE ts > now() - interval '1 minute'","refId": "A"}], + "title": "Announcements (last min)","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Route withdrawals in the last minute.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "orange","value": 1}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 12,"y": 0}, + "id": 3, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT COALESCE(sum(dels),0) AS \"Withdrawals\" FROM churn_metrics WHERE ts > now() - interval '1 minute'","refId": "A"}], + "title": "Withdrawals (last min)","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Distinct BGP sessions with churn in the last minute.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "purple","value": null}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 18,"y": 0}, + "id": 4, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(*) AS \"Sessions\" FROM (SELECT router_ip, peer_ip FROM churn_metrics WHERE ts > now() - interval '1 minute' AND (adds > 0 OR dels > 0) GROUP BY router_ip, peer_ip) s","refId": "A"}], + "title": "Churning Sessions","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "BGP churn rate over time -- announcements vs withdrawals per minute. This stays live during a storm even while the Kafka Ingestion Lag dashboard shows the bulk pipeline backed up.", + "fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 25,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true},"unit": "short"},"overrides": [{"matcher": {"id": "byName","options": "Withdrawals"},"properties": [{"id": "color","value": {"fixedColor": "red","mode": "fixed"}}]},{"matcher": {"id": "byName","options": "Announcements"},"properties": [{"id": "color","value": {"fixedColor": "green","mode": "fixed"}}]}]}, + "gridPos": {"h": 9,"w": 12,"x": 0,"y": 4}, + "id": 5, + "options": {"legend": {"calcs": ["max","sum"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT $__timeGroupAlias(ts,'1m'), sum(adds) AS \"Announcements\", sum(dels) AS \"Withdrawals\" FROM churn_metrics WHERE $__timeFilter(ts) GROUP BY 1 ORDER BY 1","refId": "A"}], + "title": "Churn Rate (per minute)","type": "timeseries" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Churn per minute broken down by the BMP router reporting it.", + "fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 20,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true,"stacking": {"group": "A","mode": "normal"}},"unit": "short"},"overrides": []}, + "gridPos": {"h": 9,"w": 12,"x": 12,"y": 4}, + "id": 6, + "options": {"legend": {"calcs": ["sum"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT $__timeGroupAlias(ts,'1m'), COALESCE(host(router_ip),'(unknown)') AS metric, sum(adds + dels) AS churn FROM churn_metrics WHERE $__timeFilter(ts) GROUP BY 1, router_ip ORDER BY 1","refId": "A"}], + "title": "Churn by Router","type": "timeseries" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Busiest BGP sessions by churn over the dashboard time range.", + "fieldConfig": {"defaults": {"custom": {"align": "auto","displayMode": "auto"}},"overrides": [{"matcher": {"id": "byName","options": "Withdraws"},"properties": [{"id": "custom.displayMode","value": "color-text"},{"id": "thresholds","value": {"mode": "absolute","steps": [{"color": "text","value": null},{"color": "orange","value": 1}]}}]}]}, + "gridPos": {"h": 9,"w": 24,"x": 0,"y": 13}, + "id": 7, + "options": {"showHeader": true,"sortBy": [{"desc": true,"displayName": "Total Churn"}]}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT host(router_ip) AS \"Router\", host(peer_ip) AS \"Peer\", peer_asn AS \"Peer AS\", sum(adds) AS \"Announces\", sum(dels) AS \"Withdraws\", sum(adds + dels) AS \"Total Churn\" FROM churn_metrics WHERE $__timeFilter(ts) GROUP BY router_ip, peer_ip, peer_asn ORDER BY \"Total Churn\" DESC LIMIT 20","refId": "A"}], + "title": "Top Churning Sessions","type": "table" + } + ], + "refresh": "10s", + "schemaVersion": 36, + "style": "dark", + "tags": ["obmp", "obmp-nav", "telemetry", "bgp"], + "templating": {"list": []}, + "time": {"from": "now-1h","to": "now"}, + "timepicker": {}, + "timezone": "", + "title": "Live BGP Churn", + "uid": "live-churn", + "version": 1, + "weekStart": "" +} diff --git a/postgres/scripts/010_churn_metrics.sql b/postgres/scripts/010_churn_metrics.sql new file mode 100644 index 0000000..ba5f12f --- /dev/null +++ b/postgres/scripts/010_churn_metrics.sql @@ -0,0 +1,30 @@ +-- 010_churn_metrics.sql +-- Fast-path BGP churn metrics, written by the obmp-churn-monitor service. +-- +-- obmp-churn-monitor reads openbmp.parsed.unicast_prefix from Kafka with its +-- own consumer group and only COUNTS announcements/withdrawals per +-- (router, peer) -- no relational RIB maintenance. Because counting is far +-- cheaper than psql-app's per-route upserts, it stays real-time even when the +-- main ingestion pipeline lags minutes behind under a churn storm. This is the +-- decoupled "visibility path": it does not speed up the bulk DB write, it +-- guarantees churn visibility survives a storm the bulk pipeline cannot. + +CREATE TABLE IF NOT EXISTS churn_metrics ( + ts timestamptz NOT NULL DEFAULT now(), + router_ip inet, + peer_ip inet, + peer_asn bigint, + adds integer, + dels integer +); + +SELECT create_hypertable('churn_metrics', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_churn_ts ON churn_metrics (ts DESC); +CREATE INDEX IF NOT EXISTS idx_churn_router_ts ON churn_metrics (router_ip, ts DESC); + +DO $$ BEGIN + PERFORM add_retention_policy('churn_metrics', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'churn_metrics retention policy not added: %', SQLERRM; +END $$; diff --git a/tools/churn_storm.py b/tools/churn_storm.py new file mode 100644 index 0000000..e9da3eb --- /dev/null +++ b/tools/churn_storm.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +"""churn_storm.py -- programmatic BGP churn-storm generator for load testing. + +Drives churn through the live OpenBMP pipeline by flapping GoBGP's eBGP +sessions to the lab core routers. Each session reset withdraws and re-learns +that core's full table (~1M routes), which propagates fleet-wide through the +route reflectors -- a realistic, large churn event. + +Pair it with the Kafka Ingestion Lag dashboard (uid kafka-lag) to measure how +the pipeline copes: peak lag, drain time, and -- by also watching `docker +stats` -- whether the bottleneck is the consumer (psql-app) or the database. + +Usage: + python3 tools/churn_storm.py # 1 cycle, all 4 cores + python3 tools/churn_storm.py --cycles 5 --interval 120 + python3 tools/churn_storm.py --neighbors 10.100.0.100,10.100.0.200 +""" +import argparse +import datetime +import subprocess +import time + +ALL_CORES = ["10.100.0.100", "10.100.0.200", "10.100.1.100", "10.100.1.200"] + + +def ts(): + return datetime.datetime.now().strftime("%H:%M:%S") + + +def reset(neighbor): + r = subprocess.run( + ["docker", "exec", "obmp-gobgp", "gobgp", "neighbor", neighbor, "reset"], + capture_output=True, text=True) + ok = r.returncode == 0 + detail = "ok" if ok else "FAIL " + (r.stderr or r.stdout).strip() + print(f" {ts()} reset {neighbor}: {detail}", flush=True) + return ok + + +def main(): + ap = argparse.ArgumentParser(description="BGP churn-storm generator") + ap.add_argument("--neighbors", default=",".join(ALL_CORES), + help="comma-separated GoBGP neighbor IPs to flap") + ap.add_argument("--cycles", type=int, default=1, + help="number of flap cycles") + ap.add_argument("--interval", type=int, default=120, + help="seconds between cycles") + a = ap.parse_args() + neighbors = [n.strip() for n in a.neighbors.split(",") if n.strip()] + + print(f"{ts()} churn storm: {a.cycles} cycle(s), {len(neighbors)} " + f"neighbor(s), {a.interval}s interval", flush=True) + for c in range(1, a.cycles + 1): + print(f"{ts()} --- cycle {c}/{a.cycles} ---", flush=True) + for n in neighbors: + reset(n) + if c < a.cycles: + time.sleep(a.interval) + print(f"{ts()} storm complete -- watch the Kafka Ingestion Lag dashboard", + flush=True) + + +if __name__ == "__main__": + main()