Add Policy Diff, fleet-wide full-table feed, and Kafka lag monitoring

Policy Diff (roadmap E2 follow-up): obmp-rib-poller pulls per-router
post-policy accepted/advertised prefix counts and route-policy bindings
over CLI+NETCONF (BMP on XRv9000 24.3.1 carries only pre-policy
Adj-RIB-In). New tables in 008_obmp_policy_diff.sql; Policy Diff
dashboard joins them against BMP ip_rib for received-vs-kept-vs-rejected.

GoBGP fleet-wide feed: GoBGP re-advertises the full Bromirski table to
both labs' core routers (CML AS65020, PROX AS65021) over eBGP; as route
reflectors the cores propagate it to every R9K client, so all 18 lab
routers carry and BMP-export a full table -- an intentional stress test
of the ingestion/storage path. cml/gobgp_peering_config.py applies and
rolls back the core-side config; gobgp/README.md documents the rollback.

Kafka lag monitoring: kafka-lag-monitor samples consumer-group lag every
30s into TimescaleDB (009_kafka_lag.sql); Kafka Ingestion Lag dashboard
gives visibility into the pipeline under churn load.

Peer Detail dashboard: the Peer selector is now router-qualified
(router -> peer) so it is unambiguous in an iBGP route-reflector mesh.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
sam 2026-05-19 12:42:25 -07:00
parent 565ebdbee0
commit b681c473c0
15 changed files with 1124 additions and 10 deletions

197
cml/gobgp_peering_config.py Normal file
View File

@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""Peer the CML core routers with the GoBGP full-table feed (roadmap E1).
GoBGP (AS65001, 10.40.40.250) holds the full real Internet table pulled from
the Bromirski route server. This script configures CORE-01/CORE-02 (AS65020)
to peer eBGP with GoBGP and accept that table. As route reflectors the cores
then propagate it to every R9K client -- so all 9 lab routers carry and
BMP-export a full table. This is an intentional lab stress test of the
OpenBMP ingestion/storage path.
Applied per core (additive -- no existing session/policy is modified):
* route-policy GOBGP-FEED-PASS (a plain `pass` policy; eBGP needs one)
* neighbor 10.40.40.202 remote-as 65001, ebgp-multihop, mgmt-sourced,
IPv4-unicast only, with a maximum-prefix safety cap.
The matching GoBGP side is gobgp/gobgpd.conf (neighbors 10.100.0.100/.200);
restart GoBGP after applying: docker compose up -d gobgp
IOS-XR BMP config is not exposed via NETCONF on this release, so -- like
cml/proxmox_bmp_config.py -- this applies config over the SSH CLI.
Covers both labs: CML cores (AS65020) and PROX cores (AS65021).
Usage:
python3 cml/gobgp_peering_config.py # apply, all 4 cores
python3 cml/gobgp_peering_config.py cml # apply, CML cores only
python3 cml/gobgp_peering_config.py prox # apply, PROX cores only
python3 cml/gobgp_peering_config.py --remove # ROLLBACK, all cores
python3 cml/gobgp_peering_config.py --remove prox # ROLLBACK, PROX only
Rollback: `--remove` deletes the GoBGP neighbor and the GOBGP-FEED-PASS
policy from the cores. To stop the feed instantly without touching router
config, `docker compose stop gobgp` -- the eBGP sessions drop and the full
table is withdrawn fleet-wide within seconds. See gobgp/README.md.
"""
import sys
import time
import paramiko
# GoBGP runs network_mode: host, so it sources BGP TCP from the host's real
# interface IP (10.40.40.202) -- NOT its router-id 10.40.40.250. The cores
# must peer with the host IP.
GOBGP_IP = "10.40.40.202"
GOBGP_AS = "65001"
# Additive config, built per core (asn = that core's local BGP AS:
# CML lab = 65020, PROX lab = 65021). Flat formal-form lines applied at the
# (config)# prompt.
# IPv4-unicast only: the cores have no global IPv6 address, so an ipv6-unicast
# AF on this IPv4-transport session holds the whole neighbor Idle. The IPv6
# full-table feed is a separate phase (needs a v6-transport session or v6
# addressing on the cores).
def apply_lines(asn):
n = f"router bgp {asn} neighbor {GOBGP_IP}"
return [
"route-policy GOBGP-FEED-PASS",
" pass",
"end-policy",
f"{n} remote-as {GOBGP_AS}",
f"{n} description GoBGP full-table feed (lab stress test)",
f"{n} ebgp-multihop 64",
f"{n} update-source MgmtEth0/RP0/CPU0/0",
f"{n} address-family ipv4 unicast route-policy GOBGP-FEED-PASS in",
f"{n} address-family ipv4 unicast route-policy GOBGP-FEED-PASS out",
f"{n} address-family ipv4 unicast maximum-prefix 1500000 90",
]
# Rollback -- remove the neighbor (and its sub-config) then the policy.
def remove_lines(asn):
return [
f"no router bgp {asn} neighbor {GOBGP_IP}",
"no route-policy GOBGP-FEED-PASS",
]
# (name, mgmt_ip, user, password, local_asn) -- both labs.
CORES = [
("CML-CORE-01", "10.100.0.100", "webui", "cisco", "65020"),
("CML-CORE-02", "10.100.0.200", "webui", "cisco", "65020"),
("PROX-CORE-01", "10.100.1.100", "admin", "cisco", "65021"),
("PROX-CORE-02", "10.100.1.200", "admin", "cisco", "65021"),
]
def _drain(shell, settle=0.4, limit=20.0, until=None):
out, start = "", time.time()
while time.time() - start < limit:
time.sleep(settle)
if shell.recv_ready():
out += shell.recv(65535).decode(errors="replace")
if until and until in out:
break
elif until is None:
break
elif until in out:
break
return out
def configure_core(name, ip, user, pwd, asn, mode):
verb = "applying" if mode == "apply" else "removing"
lines = apply_lines(asn) if mode == "apply" else remove_lines(asn)
print(f"\n=== {name} ({ip}) AS{asn} -- {verb} GoBGP peering ===")
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, username=user, password=pwd, timeout=15,
look_for_keys=False, allow_agent=False)
shell = ssh.invoke_shell(width=220, height=2000)
time.sleep(2)
shell.recv(65535)
CFG = "(config)#"
shell.send("terminal length 0\n")
_drain(shell, 0.4, 5)
shell.send("configure terminal\n")
out = _drain(shell, 0.4, 15, until=CFG)
if CFG not in out:
print(f" FAIL: could not enter config mode\n {out[-200:]}")
ssh.close()
return False
for line in lines:
shell.send(line + "\n")
time.sleep(0.4)
_drain(shell, 0.3, 8, until=CFG)
shell.send("show configuration\n")
cand = _drain(shell, 0.3, 10, until=CFG)
if GOBGP_IP not in cand and "GOBGP-FEED-PASS" not in cand:
print(f" OK: no changes ({mode} already in effect)")
shell.send("abort\n")
_drain(shell, 0.5, 5)
ssh.close()
return True
shell.send("commit\n")
result = _drain(shell, 0.3, 25, until=CFG)
if "fail" in result.lower() or "error" in result.lower():
print(f" FAIL: commit error\n {result[-300:]}")
shell.send("abort\n")
_drain(shell, 0.5, 5)
ssh.close()
return False
shell.send("end\n")
_drain(shell, 1.0, 8)
if mode == "apply":
shell.send(f"show bgp ipv4 unicast neighbors {GOBGP_IP} | include BGP state\n")
verify = _drain(shell, 1.0, 12)
state = next((l.strip() for l in verify.splitlines()
if "BGP state" in l), "(state not yet reported)")
print(f" committed. {state}")
else:
shell.send(f"show running-config router bgp | include {GOBGP_IP}\n")
verify = _drain(shell, 1.0, 12)
gone = GOBGP_IP not in verify.replace(f"include {GOBGP_IP}", "")
print(f" committed. neighbor removed: {gone}")
ssh.close()
return True
except Exception as e:
print(f" FAIL: {e}")
return False
def main():
args = [a for a in sys.argv[1:]]
mode = "apply"
if "--remove" in args:
mode = "remove"
args.remove("--remove")
target = args[0].lower() if args else None
if mode == "remove":
print("ROLLBACK: removing GoBGP peering from the core routers.")
results = {}
for name, ip, user, pwd, asn in CORES:
if target and target not in name.lower():
continue
results[name] = configure_core(name, ip, user, pwd, asn, mode)
print(f"\n{'='*48}\n SUMMARY ({mode})")
for name, ok in results.items():
print(f" {name:22s} {'OK' if ok else 'FAILED'}")
if mode == "apply":
print("\nNext: restart GoBGP to load the new neighbors:")
print(" docker compose up -d gobgp")
else:
print("\nGoBGP container config still lists the cores; that is inert")
print("with the neighbors removed. To fully revert, also restore the")
print("previous gobgp/gobgpd.conf and run: docker compose up -d gobgp")
sys.exit(0 if all(results.values()) else 1)
if __name__ == "__main__":
main()

View File

@ -484,6 +484,42 @@ services:
- EVPN_TOPIC=openbmp.parsed.evpn - EVPN_TOPIC=openbmp.parsed.evpn
- PG_DSN=host=obmp-psql port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp} - PG_DSN=host=obmp-psql port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp}
# Per-router BGP policy-diff collector. Pulls post-policy accepted/advertised
# prefix counts and route-policy bindings from the IOS-XR routers over CLI +
# NETCONF (BMP on XRv9000 24.3.1 only carries pre-policy Adj-RIB-In). Feeds
# the Policy Diff dashboard. Host networking: it must reach the lab
# management network (10.100.0.x) and the published Postgres port.
rib-poller:
restart: unless-stopped
container_name: obmp-rib-poller
build:
context: ./obmp-rib-poller
network_mode: host
depends_on:
- psql
environment:
- PG_DSN=host=10.40.40.202 port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp}
- POLL_INTERVAL=900
- ROUTER_USER=webui
- ROUTER_PASS=cisco
# Samples Kafka consumer-group lag into PostgreSQL every 30s for the Kafka
# Lag dashboard -- visibility into the ingestion path under load (e.g. a
# full-table BGP convergence storm) and a sanity check when scaling psql-app.
kafka-lag-monitor:
restart: unless-stopped
container_name: obmp-kafka-lag-monitor
build:
context: ./kafka-lag-monitor
depends_on:
- kafka
- psql
environment:
- KAFKA_BROKER=obmp-kafka:29092
- PG_DSN=host=obmp-psql port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp}
- LAG_POLL_INTERVAL=30
- CONSUMER_GROUPS=obmp-psql-consumer,evpn-psql
whois: whois:
restart: unless-stopped restart: unless-stopped
container_name: obmp-whois container_name: obmp-whois

View File

@ -60,6 +60,69 @@ GoBGP reports using its `router-id` (`10.40.40.250`) and `local-as` (`65001`):
To find it in Grafana/SQL, filter on `peer_as = 57355` or the router-id above. To find it in Grafana/SQL, filter on `peer_as = 57355` or the router-id above.
## Fleet-wide full-table feed into the CML lab (stress test)
GoBGP additionally re-advertises the full table to the two CML core routers
(CORE-01/CORE-02, AS65020). As route reflectors the cores propagate it to all
seven R9K clients, so every lab router carries and BMP-exports a full table —
an intentional stress test of the OpenBMP ingestion/storage path (the database
grows toward ~55-65 GB).
- **GoBGP side**`gobgpd.conf` neighbors `10.100.0.100` / `10.100.0.200`
(peer-as 65020, eBGP-multihop, IPv4+IPv6, `prefix-limit` caps). The
route-server sessions carry `default-export-policy = "reject-route"` so the
lab's own routes can never leak back to AS57355.
- **Router side**`cml/gobgp_peering_config.py` adds the `neighbor
10.40.40.202` config (with `maximum-prefix 1.5M`/`400k` caps) to both cores.
GoBGP is host-networked, so it sources BGP TCP from the host IP
`10.40.40.202`, not its router-id `10.40.40.250` — the cores peer with the
host IP.
### Apply
```sh
python3 cml/gobgp_peering_config.py # configure both cores
docker compose up -d --force-recreate gobgp # load gobgpd.conf changes
```
> A volume-mounted config change does NOT trigger a recreate on its own —
> `--force-recreate` is required for GoBGP to re-read `gobgpd.conf`.
### Rollback
**Emergency stop** (fastest — feed off within seconds, no router change):
```sh
docker compose stop gobgp
```
Stopping GoBGP drops the eBGP sessions; the cores withdraw the full table and
the withdrawal propagates to every client. The `ip_rib` rows are marked
withdrawn and aged out by the existing TimescaleDB retention.
**Full revert** (also removes the router-side config):
```sh
python3 cml/gobgp_peering_config.py --remove # delete neighbor from cores
docker compose stop gobgp
```
To keep the Bromirski feed running but drop only the lab injection, delete the
two `10.100.0.x` `[[neighbors]]` blocks from `gobgpd.conf` and
`docker compose up -d --force-recreate gobgp`.
### What to watch during convergence
```sh
docker exec obmp-gobgp gobgp neighbor # 4 sessions Establ
docker logs --tail 20 obmp-psql-app # consumer lag
docker exec obmp-psql psql -U openbmp -d openbmp -c \
"SELECT count(*) FROM ip_rib WHERE iswithdrawn = false;" # row growth
```
If `psql-app` consumer lag climbs without draining, or PostgreSQL CPU/IO
saturates, use the emergency stop above.
## MRT fallback ## MRT fallback
AS57355 is a **single volunteer-run host with no SLA** — it can and does go AS57355 is a **single volunteer-run host with no SLA** — it can and does go

View File

@ -5,20 +5,30 @@
# received route to the OpenBMP collector, where it lands in PostgreSQL ip_rib. # received route to the OpenBMP collector, where it lands in PostgreSQL ip_rib.
# Peering spec: https://lukasz.bromirski.net/post/bgp-w-labie-3/ # Peering spec: https://lukasz.bromirski.net/post/bgp-w-labie-3/
# #
# Receive-only: we announce NOTHING -- AS57355 explicitly asks peers not to # It ALSO re-advertises the full table to the two CML core routers
# send prefixes. Local AS is 65001 (the value the route server expects). # (CORE-01/CORE-02, AS65020) over eBGP. As route reflectors the cores
# Per the spec: eBGP multihop, no password, keepalive 3600 / hold-time 7200. # propagate it to every R9K client -- so all 9 lab routers carry and
# BMP-export a full table. This is an intentional lab stress test of the
# OpenBMP ingestion/storage path (~9x full feeds; DB grows to ~55-65 GB).
#
# Local AS is 65001 (the value the Bromirski route server expects).
# Bromirski peering: eBGP multihop, no password, keepalive 3600 / hold 7200.
# TOML syntax targets GoBGP v3.x / v4.x. # TOML syntax targets GoBGP v3.x / v4.x.
[global] [global]
[global.config] [global.config]
as = 65001 as = 65001
router-id = "10.40.40.250" router-id = "10.40.40.250"
# We only originate outbound sessions to the route server; disable the # We only originate outbound sessions (to the route server and to the
# inbound BGP listener (port -1) so the daemon needs no privileged # two cores) so the inbound BGP listener stays disabled (port -1) -- no
# (<1024) bind -- required under docker network_mode: host. # privileged (<1024) bind needed under docker network_mode: host.
port = -1 port = -1
# Note: once we peer with the cores, GoBGP learns the cores' lab routes over
# eBGP. To guarantee none of that leaks back to AS57355 (which asks peers to
# announce NOTHING), the route-server sessions below carry an apply-policy
# with default-export-policy = "reject-route" -- every export is dropped.
# --- Neighbor: route server, IPv4 feed -------------------------------------- # --- Neighbor: route server, IPv4 feed --------------------------------------
# The IPv4 transport session carries the full IPv4 table only. # The IPv4 transport session carries the full IPv4 table only.
[[neighbors]] [[neighbors]]
@ -35,6 +45,9 @@
[neighbors.transport.config] [neighbors.transport.config]
# we initiate the session; no local-address pinning # we initiate the session; no local-address pinning
passive-mode = false passive-mode = false
[neighbors.apply-policy.config]
# reject every export toward the route server
default-export-policy = "reject-route"
[[neighbors.afi-safis]] [[neighbors.afi-safis]]
[neighbors.afi-safis.config] [neighbors.afi-safis.config]
afi-safi-name = "ipv4-unicast" afi-safi-name = "ipv4-unicast"
@ -54,10 +67,94 @@
multihop-ttl = 64 multihop-ttl = 64
[neighbors.transport.config] [neighbors.transport.config]
passive-mode = false passive-mode = false
[neighbors.apply-policy.config]
# reject every export toward the route server
default-export-policy = "reject-route"
[[neighbors.afi-safis]] [[neighbors.afi-safis]]
[neighbors.afi-safis.config] [neighbors.afi-safis.config]
afi-safi-name = "ipv6-unicast" afi-safi-name = "ipv6-unicast"
# --- Neighbor: CML CORE-01 (AS65020) ----------------------------------------
# GoBGP initiates outbound to the core's mgmt IP (reachable from the docker
# host -- the cores already reach the host for BMP). GoBGP sources the session
# from the host IP 10.40.40.202. eBGP multihop: the host is several hops from
# the core. Default export policy (accept) re-advertises the full Bromirski
# table to the core. prefix-limit is a safety cap on what the core can send
# back (its lab routes only -- small).
# IPv4-unicast only: the cores have no global IPv6 address, so an ipv6 AF
# would hold the session Idle. IPv6 full-table feed is a separate phase.
[[neighbors]]
[neighbors.config]
neighbor-address = "10.100.0.100"
peer-as = 65020
description = "CML CORE-01 -- full-table injection (lab stress test)"
[neighbors.ebgp-multihop.config]
enabled = true
multihop-ttl = 64
[neighbors.transport.config]
passive-mode = false
[[neighbors.afi-safis]]
[neighbors.afi-safis.config]
afi-safi-name = "ipv4-unicast"
[neighbors.afi-safis.prefix-limit.config]
max-prefixes = 2000000
shutdown-threshold-pct = 90
# --- Neighbor: CML CORE-02 (AS65020) ----------------------------------------
[[neighbors]]
[neighbors.config]
neighbor-address = "10.100.0.200"
peer-as = 65020
description = "CML CORE-02 -- full-table injection (lab stress test)"
[neighbors.ebgp-multihop.config]
enabled = true
multihop-ttl = 64
[neighbors.transport.config]
passive-mode = false
[[neighbors.afi-safis]]
[neighbors.afi-safis.config]
afi-safi-name = "ipv4-unicast"
[neighbors.afi-safis.prefix-limit.config]
max-prefixes = 2000000
shutdown-threshold-pct = 90
# --- Neighbor: PROX CORE-01 (AS65021) ---------------------------------------
# Second lab. Same IPv4-unicast-only full-table injection as the CML cores.
[[neighbors]]
[neighbors.config]
neighbor-address = "10.100.1.100"
peer-as = 65021
description = "PROX CORE-01 -- full-table injection (lab stress test)"
[neighbors.ebgp-multihop.config]
enabled = true
multihop-ttl = 64
[neighbors.transport.config]
passive-mode = false
[[neighbors.afi-safis]]
[neighbors.afi-safis.config]
afi-safi-name = "ipv4-unicast"
[neighbors.afi-safis.prefix-limit.config]
max-prefixes = 2000000
shutdown-threshold-pct = 90
# --- Neighbor: PROX CORE-02 (AS65021) ---------------------------------------
[[neighbors]]
[neighbors.config]
neighbor-address = "10.100.1.200"
peer-as = 65021
description = "PROX CORE-02 -- full-table injection (lab stress test)"
[neighbors.ebgp-multihop.config]
enabled = true
multihop-ttl = 64
[neighbors.transport.config]
passive-mode = false
[[neighbors.afi-safis]]
[neighbors.afi-safis.config]
afi-safi-name = "ipv4-unicast"
[neighbors.afi-safis.prefix-limit.config]
max-prefixes = 2000000
shutdown-threshold-pct = 90
# --- BMP export to the OpenBMP collector ------------------------------------ # --- BMP export to the OpenBMP collector ------------------------------------
# GoBGP connects OUT to the collector. GoBGP's BMP config requires a literal # GoBGP connects OUT to the collector. GoBGP's BMP config requires a literal
# IP (it cannot resolve a hostname), so we target the docker host IP where the # IP (it cannot resolve a hostname), so we target the docker host IP where the

View File

@ -0,0 +1,8 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY monitor.py .
CMD ["python", "-u", "monitor.py"]

View File

@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""kafka-lag-monitor -- samples Kafka consumer-group lag into PostgreSQL.
Every LAG_POLL_INTERVAL seconds it records, per (group, topic, partition), the
committed offset, log-end offset and lag, plus the group's active member
count. The Kafka Lag dashboard reads kafka_consumer_lag / kafka_consumer_members
(postgres/scripts/009_kafka_lag.sql) so the ingestion path can be sanity-
checked -- watch lag spike during a BGP convergence storm and drain again, and
confirm the member count when psql-app is scaled out.
"""
import os
import time
import traceback
import psycopg2
from confluent_kafka import Consumer, TopicPartition, ConsumerGroupTopicPartitions
from confluent_kafka.admin import AdminClient
BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092")
PG_DSN = os.environ.get(
"PG_DSN",
"host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp",
)
INTERVAL = int(os.environ.get("LAG_POLL_INTERVAL", "30"))
GROUPS = [g.strip() for g in os.environ.get(
"CONSUMER_GROUPS", "obmp-psql-consumer,evpn-psql").split(",") if g.strip()]
def sample_group(admin, consumer, group):
"""Return (lag_rows, member_count) for one consumer group.
lag_rows: [(group, topic, partition, committed, log_end, lag), ...]
"""
futs = admin.list_consumer_group_offsets(
[ConsumerGroupTopicPartitions(group)])
offsets = futs[group].result(timeout=30)
rows = []
for tp in offsets.topic_partitions:
committed = tp.offset if (tp.offset is not None and tp.offset >= 0) else 0
try:
_, log_end = consumer.get_watermark_offsets(
TopicPartition(tp.topic, tp.partition), timeout=10, cached=False)
except Exception:
continue
rows.append((group, tp.topic, tp.partition, committed, log_end,
max(log_end - committed, 0)))
members = None
try:
desc = admin.describe_consumer_groups([group])[group].result(timeout=30)
members = len(desc.members)
except Exception:
pass
return rows, members
def main():
print(f"kafka-lag-monitor starting; broker={BROKER}, groups={GROUPS}, "
f"interval={INTERVAL}s", flush=True)
admin = AdminClient({"bootstrap.servers": BROKER})
consumer = Consumer({"bootstrap.servers": BROKER,
"group.id": "kafka-lag-monitor-probe",
"enable.auto.commit": False})
while True:
start = time.time()
try:
conn = psycopg2.connect(PG_DSN)
with conn.cursor() as cur:
for group in GROUPS:
try:
rows, members = sample_group(admin, consumer, group)
except Exception as e:
print(f" [{group}] sample failed: {e}", flush=True)
continue
for r in rows:
cur.execute(
"INSERT INTO kafka_consumer_lag (group_id, topic, "
"partition, committed, log_end, lag) "
"VALUES (%s,%s,%s,%s,%s,%s)", r)
if members is not None:
cur.execute(
"INSERT INTO kafka_consumer_members (group_id, "
"members) VALUES (%s,%s)", (group, members))
total = sum(r[5] for r in rows)
print(f" [{group}] {len(rows)} partitions, "
f"total lag={total}, members={members}", flush=True)
conn.commit()
conn.close()
except Exception as e:
print(f"cycle failed: {e}", flush=True)
traceback.print_exc()
time.sleep(max(0, INTERVAL - (time.time() - start)))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,2 @@
confluent-kafka==2.5.3
psycopg2-binary==2.9.9

View File

@ -0,0 +1,112 @@
{
"annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]},
"description": "Kafka consumer-group lag for the OpenBMP ingestion path, sampled every 30s by the kafka-lag-monitor service. Use it to sanity-check ingestion under load: lag spikes during a BGP convergence storm and should drain back to ~0; the consumer member count rises when psql-app is scaled out.",
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 1,
"id": null,
"links": [{"asDropdown": true,"icon": "external link","includeVars": true,"keepTime": true,"tags": ["obmp-nav"],"title": "OBMP Dashboards","type": "dashboards"}],
"liveNow": false,
"panels": [
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Total lag across all partitions at the latest sample.",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 50000},{"color": "red","value": 1000000}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 0,"y": 0},
"id": 1,
"options": {"colorMode": "background","graphMode": "area","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT sum(lag) AS \"Total Lag\" FROM kafka_consumer_lag WHERE group_id = '$group' AND ts = (SELECT max(ts) FROM kafka_consumer_lag WHERE group_id = '$group')","refId": "A"}],
"title": "Current Total Lag","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Active consumer members in the group at the latest sample. Rises when psql-app is scaled out.",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "blue","value": null}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 6,"y": 0},
"id": 2,
"options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT members AS \"Consumers\" FROM kafka_consumer_members WHERE group_id = '$group' ORDER BY ts DESC LIMIT 1","refId": "A"}],
"title": "Active Consumers","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Topic-partitions tracked for the group at the latest sample.",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "purple","value": null}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 12,"y": 0},
"id": 3,
"options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(*) AS \"Partitions\" FROM kafka_consumer_lag WHERE group_id = '$group' AND ts = (SELECT max(ts) FROM kafka_consumer_lag WHERE group_id = '$group')","refId": "A"}],
"title": "Partitions Monitored","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Highest total lag observed in the selected time range.",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 50000},{"color": "red","value": 1000000}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 18,"y": 0},
"id": 4,
"options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT max(t.total) AS \"Peak Lag\" FROM (SELECT ts, sum(lag) AS total FROM kafka_consumer_lag WHERE group_id = '$group' AND $__timeFilter(ts) GROUP BY ts) t","refId": "A"}],
"title": "Peak Lag (range)","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Total consumer lag over time. A healthy ingestion path returns to near-zero after a burst; sustained growth means consumers cannot keep up.",
"fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 25,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true},"unit": "short"},"overrides": []},
"gridPos": {"h": 9,"w": 12,"x": 0,"y": 4},
"id": 5,
"options": {"legend": {"calcs": ["max","last"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}},
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, sum(lag) AS \"Total lag\" FROM kafka_consumer_lag WHERE group_id = '$group' AND $__timeFilter(ts) GROUP BY ts ORDER BY ts","refId": "A"}],
"title": "Total Consumer Lag","type": "timeseries"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Active consumer members over time. Step changes correspond to psql-app scale events or rebalances.",
"fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 15,"lineInterpolation": "stepAfter","lineWidth": 2,"showPoints": "never","spanNulls": true},"unit": "short"},"overrides": []},
"gridPos": {"h": 9,"w": 12,"x": 12,"y": 4},
"id": 6,
"options": {"legend": {"calcs": ["min","max"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "single","sort": "none"}},
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, members AS \"Consumers\" FROM kafka_consumer_members WHERE group_id = '$group' AND $__timeFilter(ts) ORDER BY ts","refId": "A"}],
"title": "Consumer Members","type": "timeseries"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Lag broken down by topic. unicast_prefix and base_attribute carry the BGP route churn and dominate during a convergence storm.",
"fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 20,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true,"stacking": {"group": "A","mode": "normal"}},"unit": "short"},"overrides": []},
"gridPos": {"h": 9,"w": 12,"x": 0,"y": 13},
"id": 7,
"options": {"legend": {"calcs": ["last"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}},
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, topic AS metric, sum(lag) AS lag FROM kafka_consumer_lag WHERE group_id = '$group' AND $__timeFilter(ts) GROUP BY ts, topic ORDER BY ts","refId": "A"}],
"title": "Lag by Topic","type": "timeseries"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Per-partition lag for openbmp.parsed.unicast_prefix. A single deep partition that lags while others stay flat indicates a hot partition (skewed message keying) -- adding consumers gives it a dedicated thread but cannot split it.",
"fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 10,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true},"unit": "short"},"overrides": []},
"gridPos": {"h": 9,"w": 12,"x": 12,"y": 13},
"id": 8,
"options": {"legend": {"calcs": ["max","last"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}},
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, 'p' || partition AS metric, lag FROM kafka_consumer_lag WHERE group_id = '$group' AND topic = 'openbmp.parsed.unicast_prefix' AND $__timeFilter(ts) ORDER BY ts","refId": "A"}],
"title": "Lag by Partition (unicast_prefix)","type": "timeseries"
}
],
"refresh": "30s",
"schemaVersion": 36,
"style": "dark",
"tags": ["obmp", "obmp-nav", "telemetry", "kafka"],
"templating": {
"list": [
{"name": "group","type": "query","label": "Consumer Group","datasource": {"type": "postgres","uid": "obmp_postgres"},"query": "SELECT DISTINCT group_id FROM kafka_consumer_members ORDER BY 1","definition": "SELECT DISTINCT group_id FROM kafka_consumer_members ORDER BY 1","refresh": 1,"includeAll": false,"multi": false,"current": {"selected": true,"text": "obmp-psql-consumer","value": "obmp-psql-consumer"},"options": [],"sort": 1,"hide": 0}
]
},
"time": {"from": "now-3h","to": "now"},
"timepicker": {},
"timezone": "",
"title": "Kafka Ingestion Lag",
"uid": "kafka-lag",
"version": 1,
"weekStart": ""
}

View File

@ -1,6 +1,6 @@
{ {
"annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]}, "annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]},
"description": "Per-peer drilldown — BGP session identity, state history, prefix counts, update/withdraw rate, recent events and negotiated capabilities for a single BGP peer.", "description": "Per-peer drilldown — BGP session identity, state history, prefix counts, update/withdraw rate, recent events and negotiated capabilities for a single BGP session. The selector is router-qualified ('router -> peer'): prefix counts are routes RECEIVED from the selected peer (Adj-RIB-In). In a route-reflector mesh pick 'client -> core-loopback' to see a client's full received table; 'core -> client-loopback' shows only the client's originated routes.",
"editable": true, "editable": true,
"fiscalYearStartMonth": 0, "fiscalYearStartMonth": 0,
"graphTooltip": 1, "graphTooltip": 1,
@ -175,14 +175,14 @@
{ {
"current": {}, "current": {},
"datasource": {"type": "postgres","uid": "obmp_postgres"}, "datasource": {"type": "postgres","uid": "obmp_postgres"},
"definition": "select peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0", "definition": "select routername || ' -> ' || peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0 order by 1",
"hide": 0, "hide": 0,
"includeAll": false, "includeAll": false,
"label": "Peer", "label": "Router -> Peer",
"multi": false, "multi": false,
"name": "peer_hash", "name": "peer_hash",
"options": [], "options": [],
"query": "select peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0", "query": "select routername || ' -> ' || peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0 order by 1",
"refresh": 1, "refresh": 1,
"regex": "", "regex": "",
"skipUrlSync": false, "skipUrlSync": false,

View File

@ -0,0 +1,102 @@
{
"annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]},
"description": "Per-router BGP policy diff: routes RECEIVED (BMP pre-policy Adj-RIB-In) vs KEPT (accepted into the BGP table, polled from the router) vs REJECTED, plus ADVERTISED counts and the bound inbound/outbound route-policy names. Kept/advertised data is collected by the obmp-rib-poller service over CLI+NETCONF because BMP on XRv9000 24.3.1 only carries pre-policy Adj-RIB-In. NOTE: Rejected = Received - Kept is everything BGP did not accept; inbound route-policy is one cause, alongside RR originator-id/cluster-list loop detection, AS-path loops and unreachable next-hops. Per-policy attribution is a future phase.",
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": null,
"links": [{"asDropdown": true,"icon": "external link","includeVars": true,"keepTime": true,"tags": ["obmp-nav"],"title": "OBMP Dashboards","type": "dashboards"}],
"liveNow": false,
"panels": [
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Routers with poller data in the selected scope.",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "blue","value": null}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 0,"y": 0},
"id": 1,
"options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(DISTINCT rs.router_hash_id) AS \"Routers\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id WHERE r.name IN ($router)","refId": "A"}],
"title": "Routers Polled","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Neighbor address-families tracked (one row per router/neighbor/AF).",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "purple","value": null}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 6,"y": 0},
"id": 2,
"options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(*) AS \"Neighbor AFs\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id WHERE r.name IN ($router)","refId": "A"}],
"title": "Neighbor AFs Tracked","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Total prefixes received (BMP) minus kept (polled), summed where positive, across BMP-monitored neighbors. Includes inbound route-policy denies AND BGP loop/validation rejections (RR originator-id, AS-path loop, next-hop) -- not policy-only.",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "orange","value": 1}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 12,"y": 0},
"id": 3,
"options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT COALESCE(sum(GREATEST(COALESCE(rcv.received,0) - COALESCE(rs.accepted_count,0), 0)),0) AS \"Rejected\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id LEFT JOIN (SELECT bp.router_hash_id, bp.peer_addr, CASE WHEN ir.isipv4 THEN 'ipv4' ELSE 'ipv6' END AS afi, count(*) AS received FROM ip_rib ir JOIN bgp_peers bp ON bp.hash_id = ir.peer_hash_id WHERE ir.iswithdrawn = false GROUP BY bp.router_hash_id, bp.peer_addr, afi) rcv ON rcv.router_hash_id = rs.router_hash_id AND rcv.peer_addr = rs.peer_addr AND rcv.afi = rs.afi WHERE r.name IN ($router)","refId": "A"}],
"title": "Total Rejected (Recv - Kept)","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Distinct route-policies (RPL) stored from the routers.",
"fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "blue","value": null}]}},"overrides": []},
"gridPos": {"h": 4,"w": 6,"x": 18,"y": 0},
"id": 4,
"options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(*) AS \"Policies\" FROM route_policies rp JOIN routers r ON r.hash_id = rp.router_hash_id WHERE r.name IN ($router)","refId": "A"}],
"title": "Route Policies Stored","type": "stat"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Per neighbor address-family: Received = BMP pre-policy Adj-RIB-In count (blank if the neighbor is not BMP-monitored). Kept = prefixes accepted into the BGP table (polled). Rejected = Received - Kept (inbound route-policy denies plus BGP loop/validation rejections). Advertised = adj-rib-out size toward the neighbor. In/Out Policy = the bound route-policy names.",
"fieldConfig": {"defaults": {"custom": {"align": "auto","displayMode": "auto"}},"overrides": [{"matcher": {"id": "byName","options": "Rejected"},"properties": [{"id": "custom.displayMode","value": "color-text"},{"id": "thresholds","value": {"mode": "absolute","steps": [{"color": "text","value": null},{"color": "orange","value": 1}]}}]}]},
"gridPos": {"h": 12,"w": 24,"x": 0,"y": 4},
"id": 5,
"options": {"showHeader": true,"sortBy": [{"desc": true,"displayName": "Rejected"}]},
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT r.name AS \"Router\", host(rs.peer_addr) AS \"Neighbor\", rs.peer_as AS \"Peer AS\", rs.afi AS \"AF\", rs.session_state AS \"State\", rcv.received AS \"Received (BMP)\", rs.accepted_count AS \"Kept\", rcv.received - rs.accepted_count AS \"Rejected\", rs.advertised_count AS \"Advertised\", bin.policy_name AS \"In-Policy\", bout.policy_name AS \"Out-Policy\", rs.polled_at AS \"Polled\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id LEFT JOIN neighbor_policy_bind bin ON bin.router_hash_id = rs.router_hash_id AND bin.peer_addr = rs.peer_addr AND bin.afi = rs.afi AND bin.direction = 'in' LEFT JOIN neighbor_policy_bind bout ON bout.router_hash_id = rs.router_hash_id AND bout.peer_addr = rs.peer_addr AND bout.afi = rs.afi AND bout.direction = 'out' LEFT JOIN (SELECT bp.router_hash_id, bp.peer_addr, CASE WHEN ir.isipv4 THEN 'ipv4' ELSE 'ipv6' END AS afi, count(*) AS received FROM ip_rib ir JOIN bgp_peers bp ON bp.hash_id = ir.peer_hash_id WHERE ir.iswithdrawn = false GROUP BY bp.router_hash_id, bp.peer_addr, afi) rcv ON rcv.router_hash_id = rs.router_hash_id AND rcv.peer_addr = rs.peer_addr AND rcv.afi = rs.afi WHERE r.name IN ($router) ORDER BY r.name, rs.peer_addr, rs.afi","refId": "A"}],
"title": "Per-Neighbor Policy Diff","type": "table"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Prefixes received (BMP) but not accepted into the BGP table, by router.",
"fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"lineWidth": 1,"fillOpacity": 80,"axisPlacement": "auto"}},"overrides": []},
"gridPos": {"h": 8,"w": 12,"x": 0,"y": 16},
"id": 6,
"options": {"orientation": "horizontal","showValue": "auto","xField": "Router","legend": {"showLegend": false},"tooltip": {"mode": "single"}},
"pluginVersion": "9.1.7",
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT r.name AS \"Router\", sum(GREATEST(COALESCE(rcv.received,0) - COALESCE(rs.accepted_count,0), 0)) AS \"Rejected\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id LEFT JOIN (SELECT bp.router_hash_id, bp.peer_addr, CASE WHEN ir.isipv4 THEN 'ipv4' ELSE 'ipv6' END AS afi, count(*) AS received FROM ip_rib ir JOIN bgp_peers bp ON bp.hash_id = ir.peer_hash_id WHERE ir.iswithdrawn = false GROUP BY bp.router_hash_id, bp.peer_addr, afi) rcv ON rcv.router_hash_id = rs.router_hash_id AND rcv.peer_addr = rs.peer_addr AND rcv.afi = rs.afi WHERE r.name IN ($router) GROUP BY r.name ORDER BY r.name","refId": "A"}],
"title": "Rejected by Router","type": "barchart"
},
{
"datasource": {"type": "postgres","uid": "obmp_postgres"},
"description": "Full route-policy (RPL) bodies retrieved from the routers via NETCONF. The body is what the heuristic attribution engine would parse in a later phase.",
"fieldConfig": {"defaults": {"custom": {"align": "auto","displayMode": "auto"}},"overrides": []},
"gridPos": {"h": 8,"w": 12,"x": 12,"y": 16},
"id": 7,
"options": {"showHeader": true,"sortBy": [{"desc": false,"displayName": "Router"}]},
"targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT r.name AS \"Router\", rp.policy_name AS \"Policy\", rp.body AS \"RPL Body\", rp.retrieved_at AS \"Retrieved\" FROM route_policies rp JOIN routers r ON r.hash_id = rp.router_hash_id WHERE r.name IN ($router) ORDER BY r.name, rp.policy_name","refId": "A"}],
"title": "Route Policies (RPL)","type": "table"
}
],
"schemaVersion": 36,
"style": "dark",
"tags": ["obmp", "obmp-nav", "bgp", "policy"],
"templating": {
"list": [
{"name": "router","type": "query","label": "Router","datasource": {"type": "postgres","uid": "obmp_postgres"},"query": "SELECT name FROM routers ORDER BY name","definition": "SELECT name FROM routers ORDER BY name","refresh": 1,"includeAll": true,"multi": true,"current": {"selected": true,"text": ["All"],"value": ["$__all"]},"options": [],"sort": 1,"hide": 0}
]
},
"time": {"from": "now-6h","to": "now"},
"timepicker": {},
"timezone": "",
"title": "Policy Diff",
"uid": "policy-diff",
"version": 1,
"weekStart": ""
}

View File

@ -0,0 +1,8 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY poller.py .
CMD ["python", "-u", "poller.py"]

297
obmp-rib-poller/poller.py Normal file
View File

@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""obmp-rib-poller -- per-router BGP policy-diff collector.
BMP on the lab's XRv9000 24.3.1 routers only carries pre-policy Adj-RIB-In:
`route-monitoring inbound post-policy` replaces (does not supplement) the
pre-policy export and is not flagged distinctly, and adj-rib-out BMP is
unsupported on that image. So the "routes kept after inbound policy" and
"routes advertised" datasets are pulled directly from the routers:
* CLI (SSH / paramiko)
- `show bgp <af> unicast summary` per-neighbor accepted
prefix count + state
- `show bgp <af> unicast neighbors <n>
advertised-count` adj-rib-out size
- `show running-config router bgp` inbound/outbound
route-policy bindings
* NETCONF (ncclient)
- Cisco-IOS-XR-policy-repository-cfg full route-policy bodies
Results land in route_policies / neighbor_policy_bind / router_rib_stats
(postgres/scripts/008_obmp_policy_diff.sql). The Policy Diff dashboard joins
router_rib_stats against BMP ip_rib (received, pre-policy) to show received
vs kept vs discarded vs advertised, with the bound policy names.
gNMI was evaluated but is not used: gRPC dial-in is unreachable on the R9K
routers (only the cores answer on 57400). CLI + NETCONF work on every router.
"""
import os
import re
import time
import xml.etree.ElementTree as ET
import paramiko
import psycopg2
from ncclient import manager
PG_DSN = os.environ.get(
"PG_DSN",
"host=10.40.40.202 port=5432 dbname=openbmp user=openbmp password=openbmp",
)
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "900"))
R_USER = os.environ.get("ROUTER_USER", "webui")
R_PASS = os.environ.get("ROUTER_PASS", "cisco")
# Router inventory: (name, mgmt_ip, ssh_user, ssh_pass) for both labs.
# CML uses webui/cisco (the ROUTER_USER/ROUTER_PASS env defaults); the PROX
# cores only accept admin/cisco, which also works on the PROX clients, so
# admin/cisco is used for the whole PROX lab.
ROUTERS = [
("CML-CORE-01", "10.100.0.100", R_USER, R_PASS),
("CML-CORE-02", "10.100.0.200", R_USER, R_PASS),
("CML-R9K-01", "10.100.0.1", R_USER, R_PASS),
("CML-R9K-02", "10.100.0.2", R_USER, R_PASS),
("CML-R9K-03", "10.100.0.3", R_USER, R_PASS),
("CML-R9K-04", "10.100.0.4", R_USER, R_PASS),
("CML-R9K-05", "10.100.0.5", R_USER, R_PASS),
("CML-R9K-06", "10.100.0.6", R_USER, R_PASS),
("CML-R9K-07", "10.100.0.7", R_USER, R_PASS),
("PROX-CORE-01", "10.100.1.100", "admin", "cisco"),
("PROX-CORE-02", "10.100.1.200", "admin", "cisco"),
("PROX-R9K-01", "10.100.1.1", "admin", "cisco"),
("PROX-R9K-02", "10.100.1.2", "admin", "cisco"),
("PROX-R9K-03", "10.100.1.3", "admin", "cisco"),
("PROX-R9K-04", "10.100.1.4", "admin", "cisco"),
("PROX-R9K-05", "10.100.1.5", "admin", "cisco"),
("PROX-R9K-06", "10.100.1.6", "admin", "cisco"),
("PROX-R9K-07", "10.100.1.7", "admin", "cisco"),
]
# --------------------------------------------------------------------------
# Router I/O
# --------------------------------------------------------------------------
def ssh_run(ip, cmds, user, pwd):
"""Open one interactive shell, run each command, return {cmd: output}."""
cli = paramiko.SSHClient()
cli.set_missing_host_key_policy(paramiko.AutoAddPolicy())
cli.connect(ip, username=user, password=pwd, timeout=20,
look_for_keys=False, allow_agent=False)
try:
sh = cli.invoke_shell(width=240, height=8000)
time.sleep(2)
if sh.recv_ready():
sh.recv(65535)
sh.send("terminal length 0\n")
time.sleep(1)
if sh.recv_ready():
sh.recv(65535)
out = {}
for cmd in cmds:
sh.send(cmd + "\n")
buf, start = "", time.time()
while time.time() - start < 90:
time.sleep(0.5)
if sh.recv_ready():
buf += sh.recv(65535).decode(errors="replace")
if buf.rstrip().endswith("#"):
break
out[cmd] = buf
return out
finally:
cli.close()
def fetch_policies(ip, user, pwd):
"""Return {policy_name: rpl_body} via NETCONF policy-repository-cfg."""
filt = ("subtree",
'<routing-policy xmlns="http://cisco.com/ns/yang/'
'Cisco-IOS-XR-policy-repository-cfg"><route-policies/>'
'</routing-policy>')
with manager.connect(host=ip, port=830, username=user, password=pwd,
hostkey_verify=False, device_params={"name": "iosxr"},
allow_agent=False, look_for_keys=False,
timeout=40) as m:
reply = m.get_config(source="running", filter=filt)
root = ET.fromstring(str(reply))
policies = {}
for el in root.iter():
if not el.tag.endswith("}route-policy") and el.tag != "route-policy":
continue
name = body = None
for child in el:
if child.tag.endswith("route-policy-name"):
name = (child.text or "").strip()
elif child.tag.endswith("rpl-route-policy"):
body = child.text or ""
if name:
policies[name] = body
return policies
# --------------------------------------------------------------------------
# Parsers
# --------------------------------------------------------------------------
def parse_summary(text):
"""Return (router_id, {neighbor: (peer_as, state, accepted_count|None)})."""
router_id = None
m = re.search(r"BGP router identifier ([0-9A-Fa-f:.]+)", text)
if m:
router_id = m.group(1)
neighbors = {}
in_table = False
for line in text.splitlines():
if line.startswith("Neighbor") and "St/PfxRcd" in line:
in_table = True
continue
if not in_table:
continue
toks = line.split()
if len(toks) < 10:
continue
ip = toks[0]
if ("." not in ip and ":" not in ip) or not re.match(r"^[0-9A-Fa-f:.]+$", ip):
continue
try:
peer_as = int(toks[2])
except ValueError:
continue
last = toks[-1]
if last.isdigit():
state, accepted = "Established", int(last)
else:
state, accepted = last, None
neighbors[ip] = (peer_as, state, accepted)
return router_id, neighbors
def parse_bindings(text):
"""Return [(neighbor, afi, direction, policy_name)] from router-bgp config."""
binds = []
cur_nbr, cur_af = None, None
for raw in text.splitlines():
line = raw.strip()
m = re.match(r"neighbor (\S+)$", line)
if m:
cur_nbr, cur_af = m.group(1), None
continue
m = re.match(r"address-family (ipv4|ipv6) unicast$", line)
if m:
cur_af = m.group(1)
continue
m = re.match(r"route-policy (\S+) (in|out)$", line)
if m and cur_nbr and cur_af:
binds.append((cur_nbr, cur_af, m.group(2), m.group(1)))
return binds
def parse_adv_count(text):
m = re.search(r"prefixes Advertised:\s*(\d+)", text)
return int(m.group(1)) if m else None
# --------------------------------------------------------------------------
# Database
# --------------------------------------------------------------------------
def resolve_router_hash(cur, mgmt_ip, name):
"""Match the BMP routers row. ip_address (management IP) is the reliable
key: bgp_id is unpopulated and the BMP-reported names vary in case."""
cur.execute("SELECT hash_id FROM routers WHERE ip_address = %s", (mgmt_ip,))
row = cur.fetchone()
if row:
return row[0]
cur.execute("SELECT hash_id FROM routers WHERE lower(name) = lower(%s)", (name,))
row = cur.fetchone()
return row[0] if row else None
def poll_router(conn, name, ip, user, pwd):
out = ssh_run(ip, [
"show bgp ipv4 unicast summary",
"show bgp ipv6 unicast summary",
"show running-config router bgp",
], user, pwd)
rid4, nbr4 = parse_summary(out["show bgp ipv4 unicast summary"])
rid6, nbr6 = parse_summary(out["show bgp ipv6 unicast summary"])
router_id = rid4 or rid6
binds = parse_bindings(out["show running-config router bgp"])
adv_cmds = []
for nb in nbr4:
adv_cmds.append(("ipv4", nb,
f"show bgp ipv4 unicast neighbors {nb} advertised-count"))
for nb in nbr6:
adv_cmds.append(("ipv6", nb,
f"show bgp ipv6 unicast neighbors {nb} advertised-count"))
adv = {}
if adv_cmds:
adv_out = ssh_run(ip, [c for _, _, c in adv_cmds], user, pwd)
for afi, nb, cmd in adv_cmds:
adv[(afi, nb)] = parse_adv_count(adv_out.get(cmd, ""))
policies = {}
try:
policies = fetch_policies(ip, user, pwd)
except Exception as e:
print(f" [{name}] NETCONF policy fetch failed: {e}", flush=True)
with conn.cursor() as cur:
rh = resolve_router_hash(cur, ip, name)
if not rh:
print(f" [{name}] no matching BMP router (mgmt={ip}, "
f"router-id={router_id}); skipping DB write", flush=True)
conn.rollback()
return
cur.execute("DELETE FROM router_rib_stats WHERE router_hash_id=%s", (rh,))
for afi, nbrs in (("ipv4", nbr4), ("ipv6", nbr6)):
for nb, (peer_as, state, accepted) in nbrs.items():
cur.execute(
"INSERT INTO router_rib_stats (router_hash_id, peer_addr, "
"afi, peer_as, session_state, accepted_count, "
"advertised_count, polled_at) "
"VALUES (%s,%s,%s,%s,%s,%s,%s, now())",
(rh, nb, afi, peer_as, state, accepted, adv.get((afi, nb))))
cur.execute("DELETE FROM neighbor_policy_bind WHERE router_hash_id=%s", (rh,))
for nb, afi, direction, pname in binds:
cur.execute(
"INSERT INTO neighbor_policy_bind (router_hash_id, peer_addr, "
"afi, direction, policy_name, retrieved_at) "
"VALUES (%s,%s,%s,%s,%s, now()) ON CONFLICT DO NOTHING",
(rh, nb, afi, direction, pname))
cur.execute("DELETE FROM route_policies WHERE router_hash_id=%s", (rh,))
for pname, body in policies.items():
cur.execute(
"INSERT INTO route_policies (router_hash_id, policy_name, body, "
"retrieved_at) VALUES (%s,%s,%s, now())",
(rh, pname, body))
conn.commit()
print(f" [{name}] ok: {len(nbr4)} v4 + {len(nbr6)} v6 neighbors, "
f"{len(binds)} policy bindings, {len(policies)} policies", flush=True)
def main():
print(f"obmp-rib-poller starting; interval={POLL_INTERVAL}s, "
f"{len(ROUTERS)} routers", flush=True)
while True:
cycle_start = time.time()
try:
conn = psycopg2.connect(PG_DSN)
except Exception as e:
print(f"DB connect failed: {e}; retry in 30s", flush=True)
time.sleep(30)
continue
for name, ip, user, pwd in ROUTERS:
try:
poll_router(conn, name, ip, user, pwd)
except Exception as e:
conn.rollback()
print(f" [{name}] poll failed: {e}", flush=True)
conn.close()
elapsed = time.time() - cycle_start
sleep_for = max(0, POLL_INTERVAL - elapsed)
print(f"cycle done in {elapsed:.0f}s; sleeping {sleep_for:.0f}s", flush=True)
time.sleep(sleep_for)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
paramiko==3.5.0
ncclient==0.6.15
psycopg2-binary==2.9.9
lxml==5.3.0

View File

@ -0,0 +1,53 @@
-- 008_obmp_policy_diff.sql
-- Policy Diff feature -- per-router routing-policy retrieval plus per-neighbor
-- received / kept / advertised counts, populated by the obmp-rib-poller
-- service (obmp-rib-poller/poller.py).
--
-- Why a poller and not BMP: on the lab's XRv9000 24.3.1 routers BMP only
-- carries pre-policy Adj-RIB-In. `route-monitoring inbound post-policy`
-- replaces (does not supplement) the pre-policy export and is not flagged
-- distinctly, and adj-rib-out BMP (RFC 8671) is unsupported on that image.
-- So "kept after inbound policy" and "advertised" are pulled directly from
-- the routers over CLI + NETCONF.
--
-- The Policy Diff dashboard joins router_rib_stats against BMP ip_rib
-- (received, pre-policy) to show received vs kept vs discarded vs advertised.
-- Full route-policy (RPL) bodies, one row per policy per router.
CREATE TABLE IF NOT EXISTS route_policies (
router_hash_id uuid NOT NULL,
policy_name varchar(256) NOT NULL,
body text,
retrieved_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (router_hash_id, policy_name)
);
-- Which route-policy is bound inbound/outbound on each neighbor address-family.
CREATE TABLE IF NOT EXISTS neighbor_policy_bind (
router_hash_id uuid NOT NULL,
peer_addr inet NOT NULL,
afi varchar(8) NOT NULL, -- ipv4 / ipv6
direction varchar(4) NOT NULL, -- in / out
policy_name varchar(256),
retrieved_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (router_hash_id, peer_addr, afi, direction)
);
-- Per-neighbor RIB sizes pulled from the router: accepted_count is the
-- post-inbound-policy prefix count (BGP summary PfxRcd), advertised_count is
-- the adj-rib-out size toward that neighbor.
CREATE TABLE IF NOT EXISTS router_rib_stats (
router_hash_id uuid NOT NULL,
peer_addr inet NOT NULL,
afi varchar(8) NOT NULL,
peer_as bigint,
session_state varchar(32),
accepted_count integer,
advertised_count integer,
polled_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (router_hash_id, peer_addr, afi)
);
CREATE INDEX IF NOT EXISTS idx_route_policies_router ON route_policies (router_hash_id);
CREATE INDEX IF NOT EXISTS idx_neighbor_policy_bind_router ON neighbor_policy_bind (router_hash_id);
CREATE INDEX IF NOT EXISTS idx_router_rib_stats_router ON router_rib_stats (router_hash_id);

View File

@ -0,0 +1,39 @@
-- 009_kafka_lag.sql
-- Kafka consumer-group lag history for the OpenBMP ingestion path, written by
-- the kafka-lag-monitor service every ~30s. Backs the Kafka Lag dashboard so
-- the ingestion path can be sanity-checked: watch lag spike during a BGP
-- convergence storm and drain again, and confirm the consumer member count
-- when psql-app is scaled out.
CREATE TABLE IF NOT EXISTS kafka_consumer_lag (
ts timestamptz NOT NULL DEFAULT now(),
group_id varchar(128) NOT NULL,
topic varchar(200) NOT NULL,
partition integer NOT NULL,
committed bigint,
log_end bigint,
lag bigint
);
CREATE TABLE IF NOT EXISTS kafka_consumer_members (
ts timestamptz NOT NULL DEFAULT now(),
group_id varchar(128) NOT NULL,
members integer
);
SELECT create_hypertable('kafka_consumer_lag', 'ts', if_not_exists => TRUE);
SELECT create_hypertable('kafka_consumer_members', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_kafka_lag_group_ts
ON kafka_consumer_lag (group_id, ts DESC);
CREATE INDEX IF NOT EXISTS idx_kafka_members_group_ts
ON kafka_consumer_members (group_id, ts DESC);
-- Bound history growth (~250k rows/day across ~88 partitions at 30s). Wrapped
-- so the script still succeeds if the TimescaleDB job scheduler is unavailable.
DO $$ BEGIN
PERFORM add_retention_policy('kafka_consumer_lag', INTERVAL '14 days');
PERFORM add_retention_policy('kafka_consumer_members', INTERVAL '14 days');
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'kafka lag retention policy not added: %', SQLERRM;
END $$;