97 lines
3.8 KiB
Python
97 lines
3.8 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""kafka-lag-monitor -- samples Kafka consumer-group lag into PostgreSQL.
|
||
|
|
|
||
|
|
Every LAG_POLL_INTERVAL seconds it records, per (group, topic, partition), the
|
||
|
|
committed offset, log-end offset and lag, plus the group's active member
|
||
|
|
count. The Kafka Lag dashboard reads kafka_consumer_lag / kafka_consumer_members
|
||
|
|
(postgres/scripts/009_kafka_lag.sql) so the ingestion path can be sanity-
|
||
|
|
checked -- watch lag spike during a BGP convergence storm and drain again, and
|
||
|
|
confirm the member count when psql-app is scaled out.
|
||
|
|
"""
|
||
|
|
import os
|
||
|
|
import time
|
||
|
|
import traceback
|
||
|
|
|
||
|
|
import psycopg2
|
||
|
|
from confluent_kafka import Consumer, TopicPartition, ConsumerGroupTopicPartitions
|
||
|
|
from confluent_kafka.admin import AdminClient
|
||
|
|
|
||
|
|
BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092")
|
||
|
|
PG_DSN = os.environ.get(
|
||
|
|
"PG_DSN",
|
||
|
|
"host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp",
|
||
|
|
)
|
||
|
|
INTERVAL = int(os.environ.get("LAG_POLL_INTERVAL", "30"))
|
||
|
|
GROUPS = [g.strip() for g in os.environ.get(
|
||
|
|
"CONSUMER_GROUPS", "obmp-psql-consumer,evpn-psql").split(",") if g.strip()]
|
||
|
|
|
||
|
|
|
||
|
|
def sample_group(admin, consumer, group):
|
||
|
|
"""Return (lag_rows, member_count) for one consumer group.
|
||
|
|
|
||
|
|
lag_rows: [(group, topic, partition, committed, log_end, lag), ...]
|
||
|
|
"""
|
||
|
|
futs = admin.list_consumer_group_offsets(
|
||
|
|
[ConsumerGroupTopicPartitions(group)])
|
||
|
|
offsets = futs[group].result(timeout=30)
|
||
|
|
rows = []
|
||
|
|
for tp in offsets.topic_partitions:
|
||
|
|
committed = tp.offset if (tp.offset is not None and tp.offset >= 0) else 0
|
||
|
|
try:
|
||
|
|
_, log_end = consumer.get_watermark_offsets(
|
||
|
|
TopicPartition(tp.topic, tp.partition), timeout=10, cached=False)
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
rows.append((group, tp.topic, tp.partition, committed, log_end,
|
||
|
|
max(log_end - committed, 0)))
|
||
|
|
|
||
|
|
members = None
|
||
|
|
try:
|
||
|
|
desc = admin.describe_consumer_groups([group])[group].result(timeout=30)
|
||
|
|
members = len(desc.members)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
return rows, members
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
print(f"kafka-lag-monitor starting; broker={BROKER}, groups={GROUPS}, "
|
||
|
|
f"interval={INTERVAL}s", flush=True)
|
||
|
|
admin = AdminClient({"bootstrap.servers": BROKER})
|
||
|
|
consumer = Consumer({"bootstrap.servers": BROKER,
|
||
|
|
"group.id": "kafka-lag-monitor-probe",
|
||
|
|
"enable.auto.commit": False})
|
||
|
|
while True:
|
||
|
|
start = time.time()
|
||
|
|
try:
|
||
|
|
conn = psycopg2.connect(PG_DSN)
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
for group in GROUPS:
|
||
|
|
try:
|
||
|
|
rows, members = sample_group(admin, consumer, group)
|
||
|
|
except Exception as e:
|
||
|
|
print(f" [{group}] sample failed: {e}", flush=True)
|
||
|
|
continue
|
||
|
|
for r in rows:
|
||
|
|
cur.execute(
|
||
|
|
"INSERT INTO kafka_consumer_lag (group_id, topic, "
|
||
|
|
"partition, committed, log_end, lag) "
|
||
|
|
"VALUES (%s,%s,%s,%s,%s,%s)", r)
|
||
|
|
if members is not None:
|
||
|
|
cur.execute(
|
||
|
|
"INSERT INTO kafka_consumer_members (group_id, "
|
||
|
|
"members) VALUES (%s,%s)", (group, members))
|
||
|
|
total = sum(r[5] for r in rows)
|
||
|
|
print(f" [{group}] {len(rows)} partitions, "
|
||
|
|
f"total lag={total}, members={members}", flush=True)
|
||
|
|
conn.commit()
|
||
|
|
conn.close()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"cycle failed: {e}", flush=True)
|
||
|
|
traceback.print_exc()
|
||
|
|
time.sleep(max(0, INTERVAL - (time.time() - start)))
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|