#!/usr/bin/env python3 """kafka-lag-monitor -- samples Kafka consumer-group lag into PostgreSQL. Every LAG_POLL_INTERVAL seconds it records, per (group, topic, partition), the committed offset, log-end offset and lag, plus the group's active member count. The Kafka Lag dashboard reads kafka_consumer_lag / kafka_consumer_members (postgres/scripts/009_kafka_lag.sql) so the ingestion path can be sanity- checked -- watch lag spike during a BGP convergence storm and drain again, and confirm the member count when psql-app is scaled out. """ import os import time import traceback import psycopg2 from confluent_kafka import Consumer, TopicPartition, ConsumerGroupTopicPartitions from confluent_kafka.admin import AdminClient BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092") PG_DSN = os.environ.get( "PG_DSN", "host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp", ) INTERVAL = int(os.environ.get("LAG_POLL_INTERVAL", "30")) GROUPS = [g.strip() for g in os.environ.get( "CONSUMER_GROUPS", "obmp-psql-consumer,evpn-psql").split(",") if g.strip()] def sample_group(admin, consumer, group): """Return (lag_rows, member_count) for one consumer group. lag_rows: [(group, topic, partition, committed, log_end, lag), ...] """ futs = admin.list_consumer_group_offsets( [ConsumerGroupTopicPartitions(group)]) offsets = futs[group].result(timeout=30) rows = [] for tp in offsets.topic_partitions: committed = tp.offset if (tp.offset is not None and tp.offset >= 0) else 0 try: _, log_end = consumer.get_watermark_offsets( TopicPartition(tp.topic, tp.partition), timeout=10, cached=False) except Exception: continue rows.append((group, tp.topic, tp.partition, committed, log_end, max(log_end - committed, 0))) members = None try: desc = admin.describe_consumer_groups([group])[group].result(timeout=30) members = len(desc.members) except Exception: pass return rows, members def main(): print(f"kafka-lag-monitor starting; broker={BROKER}, groups={GROUPS}, " f"interval={INTERVAL}s", flush=True) admin = AdminClient({"bootstrap.servers": BROKER}) consumer = Consumer({"bootstrap.servers": BROKER, "group.id": "kafka-lag-monitor-probe", "enable.auto.commit": False}) while True: start = time.time() try: conn = psycopg2.connect(PG_DSN) with conn.cursor() as cur: for group in GROUPS: try: rows, members = sample_group(admin, consumer, group) except Exception as e: print(f" [{group}] sample failed: {e}", flush=True) continue for r in rows: cur.execute( "INSERT INTO kafka_consumer_lag (group_id, topic, " "partition, committed, log_end, lag) " "VALUES (%s,%s,%s,%s,%s,%s)", r) if members is not None: cur.execute( "INSERT INTO kafka_consumer_members (group_id, " "members) VALUES (%s,%s)", (group, members)) total = sum(r[5] for r in rows) print(f" [{group}] {len(rows)} partitions, " f"total lag={total}, members={members}", flush=True) conn.commit() conn.close() except Exception as e: print(f"cycle failed: {e}", flush=True) traceback.print_exc() time.sleep(max(0, INTERVAL - (time.time() - start))) if __name__ == "__main__": main()