Policy Diff (roadmap E2 follow-up): obmp-rib-poller pulls per-router post-policy accepted/advertised prefix counts and route-policy bindings over CLI+NETCONF (BMP on XRv9000 24.3.1 carries only pre-policy Adj-RIB-In). New tables in 008_obmp_policy_diff.sql; Policy Diff dashboard joins them against BMP ip_rib for received-vs-kept-vs-rejected. GoBGP fleet-wide feed: GoBGP re-advertises the full Bromirski table to both labs' core routers (CML AS65020, PROX AS65021) over eBGP; as route reflectors the cores propagate it to every R9K client, so all 18 lab routers carry and BMP-export a full table -- an intentional stress test of the ingestion/storage path. cml/gobgp_peering_config.py applies and rolls back the core-side config; gobgp/README.md documents the rollback. Kafka lag monitoring: kafka-lag-monitor samples consumer-group lag every 30s into TimescaleDB (009_kafka_lag.sql); Kafka Ingestion Lag dashboard gives visibility into the pipeline under churn load. Peer Detail dashboard: the Peer selector is now router-qualified (router -> peer) so it is unambiguous in an iBGP route-reflector mesh. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
97 lines
3.8 KiB
Python
97 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
"""kafka-lag-monitor -- samples Kafka consumer-group lag into PostgreSQL.
|
|
|
|
Every LAG_POLL_INTERVAL seconds it records, per (group, topic, partition), the
|
|
committed offset, log-end offset and lag, plus the group's active member
|
|
count. The Kafka Lag dashboard reads kafka_consumer_lag / kafka_consumer_members
|
|
(postgres/scripts/009_kafka_lag.sql) so the ingestion path can be sanity-
|
|
checked -- watch lag spike during a BGP convergence storm and drain again, and
|
|
confirm the member count when psql-app is scaled out.
|
|
"""
|
|
import os
|
|
import time
|
|
import traceback
|
|
|
|
import psycopg2
|
|
from confluent_kafka import Consumer, TopicPartition, ConsumerGroupTopicPartitions
|
|
from confluent_kafka.admin import AdminClient
|
|
|
|
BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092")
|
|
PG_DSN = os.environ.get(
|
|
"PG_DSN",
|
|
"host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp",
|
|
)
|
|
INTERVAL = int(os.environ.get("LAG_POLL_INTERVAL", "30"))
|
|
GROUPS = [g.strip() for g in os.environ.get(
|
|
"CONSUMER_GROUPS", "obmp-psql-consumer,evpn-psql").split(",") if g.strip()]
|
|
|
|
|
|
def sample_group(admin, consumer, group):
|
|
"""Return (lag_rows, member_count) for one consumer group.
|
|
|
|
lag_rows: [(group, topic, partition, committed, log_end, lag), ...]
|
|
"""
|
|
futs = admin.list_consumer_group_offsets(
|
|
[ConsumerGroupTopicPartitions(group)])
|
|
offsets = futs[group].result(timeout=30)
|
|
rows = []
|
|
for tp in offsets.topic_partitions:
|
|
committed = tp.offset if (tp.offset is not None and tp.offset >= 0) else 0
|
|
try:
|
|
_, log_end = consumer.get_watermark_offsets(
|
|
TopicPartition(tp.topic, tp.partition), timeout=10, cached=False)
|
|
except Exception:
|
|
continue
|
|
rows.append((group, tp.topic, tp.partition, committed, log_end,
|
|
max(log_end - committed, 0)))
|
|
|
|
members = None
|
|
try:
|
|
desc = admin.describe_consumer_groups([group])[group].result(timeout=30)
|
|
members = len(desc.members)
|
|
except Exception:
|
|
pass
|
|
return rows, members
|
|
|
|
|
|
def main():
|
|
print(f"kafka-lag-monitor starting; broker={BROKER}, groups={GROUPS}, "
|
|
f"interval={INTERVAL}s", flush=True)
|
|
admin = AdminClient({"bootstrap.servers": BROKER})
|
|
consumer = Consumer({"bootstrap.servers": BROKER,
|
|
"group.id": "kafka-lag-monitor-probe",
|
|
"enable.auto.commit": False})
|
|
while True:
|
|
start = time.time()
|
|
try:
|
|
conn = psycopg2.connect(PG_DSN)
|
|
with conn.cursor() as cur:
|
|
for group in GROUPS:
|
|
try:
|
|
rows, members = sample_group(admin, consumer, group)
|
|
except Exception as e:
|
|
print(f" [{group}] sample failed: {e}", flush=True)
|
|
continue
|
|
for r in rows:
|
|
cur.execute(
|
|
"INSERT INTO kafka_consumer_lag (group_id, topic, "
|
|
"partition, committed, log_end, lag) "
|
|
"VALUES (%s,%s,%s,%s,%s,%s)", r)
|
|
if members is not None:
|
|
cur.execute(
|
|
"INSERT INTO kafka_consumer_members (group_id, "
|
|
"members) VALUES (%s,%s)", (group, members))
|
|
total = sum(r[5] for r in rows)
|
|
print(f" [{group}] {len(rows)} partitions, "
|
|
f"total lag={total}, members={members}", flush=True)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"cycle failed: {e}", flush=True)
|
|
traceback.print_exc()
|
|
time.sleep(max(0, INTERVAL - (time.time() - start)))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|