obmp-docker/postgres/scripts/009_kafka_lag.sql
sam b681c473c0 Add Policy Diff, fleet-wide full-table feed, and Kafka lag monitoring
Policy Diff (roadmap E2 follow-up): obmp-rib-poller pulls per-router
post-policy accepted/advertised prefix counts and route-policy bindings
over CLI+NETCONF (BMP on XRv9000 24.3.1 carries only pre-policy
Adj-RIB-In). New tables in 008_obmp_policy_diff.sql; Policy Diff
dashboard joins them against BMP ip_rib for received-vs-kept-vs-rejected.

GoBGP fleet-wide feed: GoBGP re-advertises the full Bromirski table to
both labs' core routers (CML AS65020, PROX AS65021) over eBGP; as route
reflectors the cores propagate it to every R9K client, so all 18 lab
routers carry and BMP-export a full table -- an intentional stress test
of the ingestion/storage path. cml/gobgp_peering_config.py applies and
rolls back the core-side config; gobgp/README.md documents the rollback.

Kafka lag monitoring: kafka-lag-monitor samples consumer-group lag every
30s into TimescaleDB (009_kafka_lag.sql); Kafka Ingestion Lag dashboard
gives visibility into the pipeline under churn load.

Peer Detail dashboard: the Peer selector is now router-qualified
(router -> peer) so it is unambiguous in an iBGP route-reflector mesh.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 12:42:25 -07:00

40 lines
1.6 KiB
SQL

-- 009_kafka_lag.sql
-- Kafka consumer-group lag history for the OpenBMP ingestion path, written by
-- the kafka-lag-monitor service every ~30s. Backs the Kafka Lag dashboard so
-- the ingestion path can be sanity-checked: watch lag spike during a BGP
-- convergence storm and drain again, and confirm the consumer member count
-- when psql-app is scaled out.
CREATE TABLE IF NOT EXISTS kafka_consumer_lag (
ts timestamptz NOT NULL DEFAULT now(),
group_id varchar(128) NOT NULL,
topic varchar(200) NOT NULL,
partition integer NOT NULL,
committed bigint,
log_end bigint,
lag bigint
);
CREATE TABLE IF NOT EXISTS kafka_consumer_members (
ts timestamptz NOT NULL DEFAULT now(),
group_id varchar(128) NOT NULL,
members integer
);
SELECT create_hypertable('kafka_consumer_lag', 'ts', if_not_exists => TRUE);
SELECT create_hypertable('kafka_consumer_members', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_kafka_lag_group_ts
ON kafka_consumer_lag (group_id, ts DESC);
CREATE INDEX IF NOT EXISTS idx_kafka_members_group_ts
ON kafka_consumer_members (group_id, ts DESC);
-- Bound history growth (~250k rows/day across ~88 partitions at 30s). Wrapped
-- so the script still succeeds if the TimescaleDB job scheduler is unavailable.
DO $$ BEGIN
PERFORM add_retention_policy('kafka_consumer_lag', INTERVAL '14 days');
PERFORM add_retention_policy('kafka_consumer_members', INTERVAL '14 days');
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'kafka lag retention policy not added: %', SQLERRM;
END $$;