97 lines
3.8 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""kafka-lag-monitor -- samples Kafka consumer-group lag into PostgreSQL.
Every LAG_POLL_INTERVAL seconds it records, per (group, topic, partition), the
committed offset, log-end offset and lag, plus the group's active member
count. The Kafka Lag dashboard reads kafka_consumer_lag / kafka_consumer_members
(postgres/scripts/009_kafka_lag.sql) so the ingestion path can be sanity-
checked -- watch lag spike during a BGP convergence storm and drain again, and
confirm the member count when psql-app is scaled out.
"""
import os
import time
import traceback
import psycopg2
from confluent_kafka import Consumer, TopicPartition, ConsumerGroupTopicPartitions
from confluent_kafka.admin import AdminClient
BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092")
PG_DSN = os.environ.get(
"PG_DSN",
"host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp",
)
INTERVAL = int(os.environ.get("LAG_POLL_INTERVAL", "30"))
GROUPS = [g.strip() for g in os.environ.get(
"CONSUMER_GROUPS", "obmp-psql-consumer,evpn-psql").split(",") if g.strip()]
def sample_group(admin, consumer, group):
"""Return (lag_rows, member_count) for one consumer group.
lag_rows: [(group, topic, partition, committed, log_end, lag), ...]
"""
futs = admin.list_consumer_group_offsets(
[ConsumerGroupTopicPartitions(group)])
offsets = futs[group].result(timeout=30)
rows = []
for tp in offsets.topic_partitions:
committed = tp.offset if (tp.offset is not None and tp.offset >= 0) else 0
try:
_, log_end = consumer.get_watermark_offsets(
TopicPartition(tp.topic, tp.partition), timeout=10, cached=False)
except Exception:
continue
rows.append((group, tp.topic, tp.partition, committed, log_end,
max(log_end - committed, 0)))
members = None
try:
desc = admin.describe_consumer_groups([group])[group].result(timeout=30)
members = len(desc.members)
except Exception:
pass
return rows, members
def main():
print(f"kafka-lag-monitor starting; broker={BROKER}, groups={GROUPS}, "
f"interval={INTERVAL}s", flush=True)
admin = AdminClient({"bootstrap.servers": BROKER})
consumer = Consumer({"bootstrap.servers": BROKER,
"group.id": "kafka-lag-monitor-probe",
"enable.auto.commit": False})
while True:
start = time.time()
try:
conn = psycopg2.connect(PG_DSN)
with conn.cursor() as cur:
for group in GROUPS:
try:
rows, members = sample_group(admin, consumer, group)
except Exception as e:
print(f" [{group}] sample failed: {e}", flush=True)
continue
for r in rows:
cur.execute(
"INSERT INTO kafka_consumer_lag (group_id, topic, "
"partition, committed, log_end, lag) "
"VALUES (%s,%s,%s,%s,%s,%s)", r)
if members is not None:
cur.execute(
"INSERT INTO kafka_consumer_members (group_id, "
"members) VALUES (%s,%s)", (group, members))
total = sum(r[5] for r in rows)
print(f" [{group}] {len(rows)} partitions, "
f"total lag={total}, members={members}", flush=True)
conn.commit()
conn.close()
except Exception as e:
print(f"cycle failed: {e}", flush=True)
traceback.print_exc()
time.sleep(max(0, INTERVAL - (time.time() - start)))
if __name__ == "__main__":
main()