103 lines
3.8 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""obmp-churn-monitor -- decoupled fast-path BGP churn-rate monitor.
Reads openbmp.parsed.unicast_prefix from Kafka with its OWN consumer group and
only COUNTS announcements/withdrawals per (router, peer) -- no relational RIB
maintenance, no per-route DB upserts. Because counting is orders of magnitude
cheaper than psql-app's work, this stays real-time even when the main
ingestion pipeline lags minutes behind during a churn storm.
It does NOT make the bulk DB write faster -- it guarantees churn *visibility*
survives a storm the bulk pipeline cannot keep up with. Aggregates flush every
FLUSH_INTERVAL seconds to churn_metrics (postgres/scripts/010_churn_metrics.sql);
the Live BGP Churn dashboard reads it.
The consumer starts at the topic head (auto.offset.reset=latest, no commits):
on restart it jumps to current churn rather than replaying a stale backlog.
"""
import collections
import os
import time
import psycopg2
from confluent_kafka import Consumer
BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092")
TOPIC = os.environ.get("CHURN_TOPIC", "openbmp.parsed.unicast_prefix")
PG_DSN = os.environ.get(
"PG_DSN",
"host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp",
)
FLUSH_INTERVAL = int(os.environ.get("FLUSH_INTERVAL", "10"))
# Tab-separated field positions in an openbmp.parsed.unicast_prefix data row.
F_ACTION = 0 # "add" (announce) or "del" (withdraw)
F_ROUTER_IP = 4 # BMP router management IP
F_PEER_IP = 7 # BGP peer address
F_PEER_ASN = 8 # BGP peer ASN
def flush(counts, window):
"""Write accumulated per-(router,peer) counts to churn_metrics."""
if not counts:
return
try:
conn = psycopg2.connect(PG_DSN)
with conn.cursor() as cur:
for (router, peer, asn), (adds, dels) in counts.items():
cur.execute(
"INSERT INTO churn_metrics (router_ip, peer_ip, peer_asn, "
"adds, dels) VALUES (%s,%s,%s,%s,%s)",
(router or None, peer or None, asn, adds, dels))
conn.commit()
conn.close()
tot_a = sum(v[0] for v in counts.values())
tot_d = sum(v[1] for v in counts.values())
print(f"flush: {len(counts)} sessions, +{tot_a} -{tot_d} "
f"({(tot_a + tot_d) / max(window, 1):.0f} msg/s) over "
f"{window:.0f}s", flush=True)
except Exception as e:
print(f"flush failed: {e}", flush=True)
def main():
print(f"obmp-churn-monitor: topic={TOPIC}, flush={FLUSH_INTERVAL}s", flush=True)
consumer = Consumer({
"bootstrap.servers": BROKER,
"group.id": "obmp-churn-monitor",
"auto.offset.reset": "latest",
"enable.auto.commit": False,
})
consumer.subscribe([TOPIC])
counts = collections.defaultdict(lambda: [0, 0]) # key -> [adds, dels]
last_flush = time.time()
while True:
msg = consumer.poll(1.0)
if msg is not None and not msg.error():
text = msg.value().decode("utf-8", errors="replace")
for line in text.split("\n"):
if not (line.startswith("add\t") or line.startswith("del\t")):
continue
f = line.split("\t")
if len(f) <= F_PEER_ASN:
continue
try:
asn = int(f[F_PEER_ASN])
except ValueError:
asn = None
key = (f[F_ROUTER_IP], f[F_PEER_IP], asn)
if f[F_ACTION] == "add":
counts[key][0] += 1
else:
counts[key][1] += 1
now = time.time()
if now - last_flush >= FLUSH_INTERVAL:
flush(counts, now - last_flush)
counts.clear()
last_flush = now
if __name__ == "__main__":
main()