obmp-churn-monitor: a decoupled fast-path BGP churn consumer. Reads openbmp.parsed.unicast_prefix with its own Kafka consumer group and only counts announcements/withdrawals per (router,peer) into churn_metrics (010_churn_metrics.sql) -- no relational RIB write. Storm-tested: it stayed real-time (tracked 1k->85k msg/s) while the psql-app bulk pipeline lag grew 3.8M->5.6M. Live BGP Churn dashboard reads it. tools/churn_storm.py: programmatic churn-storm generator (flaps GoBGP's eBGP sessions to the lab cores) for load testing. Stress-test finding: fleet-wide full table from 18 routers exceeds this 31 GiB host. The bottleneck is RAM, not CPU -- at 16 cores the host still hit load 33 because it was swap-thrashing (swap 2/2 full, <1.5 GiB free). Lag ran away 3.8M->20M+. Recourse: more host RAM for bulk throughput; the fast-path consumer for visibility regardless. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
103 lines
3.8 KiB
Python
103 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
"""obmp-churn-monitor -- decoupled fast-path BGP churn-rate monitor.
|
|
|
|
Reads openbmp.parsed.unicast_prefix from Kafka with its OWN consumer group and
|
|
only COUNTS announcements/withdrawals per (router, peer) -- no relational RIB
|
|
maintenance, no per-route DB upserts. Because counting is orders of magnitude
|
|
cheaper than psql-app's work, this stays real-time even when the main
|
|
ingestion pipeline lags minutes behind during a churn storm.
|
|
|
|
It does NOT make the bulk DB write faster -- it guarantees churn *visibility*
|
|
survives a storm the bulk pipeline cannot keep up with. Aggregates flush every
|
|
FLUSH_INTERVAL seconds to churn_metrics (postgres/scripts/010_churn_metrics.sql);
|
|
the Live BGP Churn dashboard reads it.
|
|
|
|
The consumer starts at the topic head (auto.offset.reset=latest, no commits):
|
|
on restart it jumps to current churn rather than replaying a stale backlog.
|
|
"""
|
|
import collections
|
|
import os
|
|
import time
|
|
|
|
import psycopg2
|
|
from confluent_kafka import Consumer
|
|
|
|
BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092")
|
|
TOPIC = os.environ.get("CHURN_TOPIC", "openbmp.parsed.unicast_prefix")
|
|
PG_DSN = os.environ.get(
|
|
"PG_DSN",
|
|
"host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp",
|
|
)
|
|
FLUSH_INTERVAL = int(os.environ.get("FLUSH_INTERVAL", "10"))
|
|
|
|
# Tab-separated field positions in an openbmp.parsed.unicast_prefix data row.
|
|
F_ACTION = 0 # "add" (announce) or "del" (withdraw)
|
|
F_ROUTER_IP = 4 # BMP router management IP
|
|
F_PEER_IP = 7 # BGP peer address
|
|
F_PEER_ASN = 8 # BGP peer ASN
|
|
|
|
|
|
def flush(counts, window):
|
|
"""Write accumulated per-(router,peer) counts to churn_metrics."""
|
|
if not counts:
|
|
return
|
|
try:
|
|
conn = psycopg2.connect(PG_DSN)
|
|
with conn.cursor() as cur:
|
|
for (router, peer, asn), (adds, dels) in counts.items():
|
|
cur.execute(
|
|
"INSERT INTO churn_metrics (router_ip, peer_ip, peer_asn, "
|
|
"adds, dels) VALUES (%s,%s,%s,%s,%s)",
|
|
(router or None, peer or None, asn, adds, dels))
|
|
conn.commit()
|
|
conn.close()
|
|
tot_a = sum(v[0] for v in counts.values())
|
|
tot_d = sum(v[1] for v in counts.values())
|
|
print(f"flush: {len(counts)} sessions, +{tot_a} -{tot_d} "
|
|
f"({(tot_a + tot_d) / max(window, 1):.0f} msg/s) over "
|
|
f"{window:.0f}s", flush=True)
|
|
except Exception as e:
|
|
print(f"flush failed: {e}", flush=True)
|
|
|
|
|
|
def main():
|
|
print(f"obmp-churn-monitor: topic={TOPIC}, flush={FLUSH_INTERVAL}s", flush=True)
|
|
consumer = Consumer({
|
|
"bootstrap.servers": BROKER,
|
|
"group.id": "obmp-churn-monitor",
|
|
"auto.offset.reset": "latest",
|
|
"enable.auto.commit": False,
|
|
})
|
|
consumer.subscribe([TOPIC])
|
|
|
|
counts = collections.defaultdict(lambda: [0, 0]) # key -> [adds, dels]
|
|
last_flush = time.time()
|
|
while True:
|
|
msg = consumer.poll(1.0)
|
|
if msg is not None and not msg.error():
|
|
text = msg.value().decode("utf-8", errors="replace")
|
|
for line in text.split("\n"):
|
|
if not (line.startswith("add\t") or line.startswith("del\t")):
|
|
continue
|
|
f = line.split("\t")
|
|
if len(f) <= F_PEER_ASN:
|
|
continue
|
|
try:
|
|
asn = int(f[F_PEER_ASN])
|
|
except ValueError:
|
|
asn = None
|
|
key = (f[F_ROUTER_IP], f[F_PEER_IP], asn)
|
|
if f[F_ACTION] == "add":
|
|
counts[key][0] += 1
|
|
else:
|
|
counts[key][1] += 1
|
|
now = time.time()
|
|
if now - last_flush >= FLUSH_INTERVAL:
|
|
flush(counts, now - last_flush)
|
|
counts.clear()
|
|
last_flush = now
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|