obmp-churn-monitor: a decoupled fast-path BGP churn consumer. Reads openbmp.parsed.unicast_prefix with its own Kafka consumer group and only counts announcements/withdrawals per (router,peer) into churn_metrics (010_churn_metrics.sql) -- no relational RIB write. Storm-tested: it stayed real-time (tracked 1k->85k msg/s) while the psql-app bulk pipeline lag grew 3.8M->5.6M. Live BGP Churn dashboard reads it. tools/churn_storm.py: programmatic churn-storm generator (flaps GoBGP's eBGP sessions to the lab cores) for load testing. Stress-test finding: fleet-wide full table from 18 routers exceeds this 31 GiB host. The bottleneck is RAM, not CPU -- at 16 cores the host still hit load 33 because it was swap-thrashing (swap 2/2 full, <1.5 GiB free). Lag ran away 3.8M->20M+. Recourse: more host RAM for bulk throughput; the fast-path consumer for visibility regardless. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
65 lines
2.3 KiB
Python
65 lines
2.3 KiB
Python
#!/usr/bin/env python3
|
|
"""churn_storm.py -- programmatic BGP churn-storm generator for load testing.
|
|
|
|
Drives churn through the live OpenBMP pipeline by flapping GoBGP's eBGP
|
|
sessions to the lab core routers. Each session reset withdraws and re-learns
|
|
that core's full table (~1M routes), which propagates fleet-wide through the
|
|
route reflectors -- a realistic, large churn event.
|
|
|
|
Pair it with the Kafka Ingestion Lag dashboard (uid kafka-lag) to measure how
|
|
the pipeline copes: peak lag, drain time, and -- by also watching `docker
|
|
stats` -- whether the bottleneck is the consumer (psql-app) or the database.
|
|
|
|
Usage:
|
|
python3 tools/churn_storm.py # 1 cycle, all 4 cores
|
|
python3 tools/churn_storm.py --cycles 5 --interval 120
|
|
python3 tools/churn_storm.py --neighbors 10.100.0.100,10.100.0.200
|
|
"""
|
|
import argparse
|
|
import datetime
|
|
import subprocess
|
|
import time
|
|
|
|
ALL_CORES = ["10.100.0.100", "10.100.0.200", "10.100.1.100", "10.100.1.200"]
|
|
|
|
|
|
def ts():
|
|
return datetime.datetime.now().strftime("%H:%M:%S")
|
|
|
|
|
|
def reset(neighbor):
|
|
r = subprocess.run(
|
|
["docker", "exec", "obmp-gobgp", "gobgp", "neighbor", neighbor, "reset"],
|
|
capture_output=True, text=True)
|
|
ok = r.returncode == 0
|
|
detail = "ok" if ok else "FAIL " + (r.stderr or r.stdout).strip()
|
|
print(f" {ts()} reset {neighbor}: {detail}", flush=True)
|
|
return ok
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="BGP churn-storm generator")
|
|
ap.add_argument("--neighbors", default=",".join(ALL_CORES),
|
|
help="comma-separated GoBGP neighbor IPs to flap")
|
|
ap.add_argument("--cycles", type=int, default=1,
|
|
help="number of flap cycles")
|
|
ap.add_argument("--interval", type=int, default=120,
|
|
help="seconds between cycles")
|
|
a = ap.parse_args()
|
|
neighbors = [n.strip() for n in a.neighbors.split(",") if n.strip()]
|
|
|
|
print(f"{ts()} churn storm: {a.cycles} cycle(s), {len(neighbors)} "
|
|
f"neighbor(s), {a.interval}s interval", flush=True)
|
|
for c in range(1, a.cycles + 1):
|
|
print(f"{ts()} --- cycle {c}/{a.cycles} ---", flush=True)
|
|
for n in neighbors:
|
|
reset(n)
|
|
if c < a.cycles:
|
|
time.sleep(a.interval)
|
|
print(f"{ts()} storm complete -- watch the Kafka Ingestion Lag dashboard",
|
|
flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|