diff --git a/cml/gobgp_peering_config.py b/cml/gobgp_peering_config.py new file mode 100644 index 0000000..4e8b537 --- /dev/null +++ b/cml/gobgp_peering_config.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +"""Peer the CML core routers with the GoBGP full-table feed (roadmap E1). + +GoBGP (AS65001, 10.40.40.250) holds the full real Internet table pulled from +the Bromirski route server. This script configures CORE-01/CORE-02 (AS65020) +to peer eBGP with GoBGP and accept that table. As route reflectors the cores +then propagate it to every R9K client -- so all 9 lab routers carry and +BMP-export a full table. This is an intentional lab stress test of the +OpenBMP ingestion/storage path. + +Applied per core (additive -- no existing session/policy is modified): + * route-policy GOBGP-FEED-PASS (a plain `pass` policy; eBGP needs one) + * neighbor 10.40.40.202 remote-as 65001, ebgp-multihop, mgmt-sourced, + IPv4-unicast only, with a maximum-prefix safety cap. + +The matching GoBGP side is gobgp/gobgpd.conf (neighbors 10.100.0.100/.200); +restart GoBGP after applying: docker compose up -d gobgp + +IOS-XR BMP config is not exposed via NETCONF on this release, so -- like +cml/proxmox_bmp_config.py -- this applies config over the SSH CLI. + +Covers both labs: CML cores (AS65020) and PROX cores (AS65021). + +Usage: + python3 cml/gobgp_peering_config.py # apply, all 4 cores + python3 cml/gobgp_peering_config.py cml # apply, CML cores only + python3 cml/gobgp_peering_config.py prox # apply, PROX cores only + python3 cml/gobgp_peering_config.py --remove # ROLLBACK, all cores + python3 cml/gobgp_peering_config.py --remove prox # ROLLBACK, PROX only + +Rollback: `--remove` deletes the GoBGP neighbor and the GOBGP-FEED-PASS +policy from the cores. To stop the feed instantly without touching router +config, `docker compose stop gobgp` -- the eBGP sessions drop and the full +table is withdrawn fleet-wide within seconds. See gobgp/README.md. +""" +import sys +import time +import paramiko + +# GoBGP runs network_mode: host, so it sources BGP TCP from the host's real +# interface IP (10.40.40.202) -- NOT its router-id 10.40.40.250. The cores +# must peer with the host IP. +GOBGP_IP = "10.40.40.202" +GOBGP_AS = "65001" + +# Additive config, built per core (asn = that core's local BGP AS: +# CML lab = 65020, PROX lab = 65021). Flat formal-form lines applied at the +# (config)# prompt. +# IPv4-unicast only: the cores have no global IPv6 address, so an ipv6-unicast +# AF on this IPv4-transport session holds the whole neighbor Idle. The IPv6 +# full-table feed is a separate phase (needs a v6-transport session or v6 +# addressing on the cores). +def apply_lines(asn): + n = f"router bgp {asn} neighbor {GOBGP_IP}" + return [ + "route-policy GOBGP-FEED-PASS", + " pass", + "end-policy", + f"{n} remote-as {GOBGP_AS}", + f"{n} description GoBGP full-table feed (lab stress test)", + f"{n} ebgp-multihop 64", + f"{n} update-source MgmtEth0/RP0/CPU0/0", + f"{n} address-family ipv4 unicast route-policy GOBGP-FEED-PASS in", + f"{n} address-family ipv4 unicast route-policy GOBGP-FEED-PASS out", + f"{n} address-family ipv4 unicast maximum-prefix 1500000 90", + ] + + +# Rollback -- remove the neighbor (and its sub-config) then the policy. +def remove_lines(asn): + return [ + f"no router bgp {asn} neighbor {GOBGP_IP}", + "no route-policy GOBGP-FEED-PASS", + ] + + +# (name, mgmt_ip, user, password, local_asn) -- both labs. +CORES = [ + ("CML-CORE-01", "10.100.0.100", "webui", "cisco", "65020"), + ("CML-CORE-02", "10.100.0.200", "webui", "cisco", "65020"), + ("PROX-CORE-01", "10.100.1.100", "admin", "cisco", "65021"), + ("PROX-CORE-02", "10.100.1.200", "admin", "cisco", "65021"), +] + + +def _drain(shell, settle=0.4, limit=20.0, until=None): + out, start = "", time.time() + while time.time() - start < limit: + time.sleep(settle) + if shell.recv_ready(): + out += shell.recv(65535).decode(errors="replace") + if until and until in out: + break + elif until is None: + break + elif until in out: + break + return out + + +def configure_core(name, ip, user, pwd, asn, mode): + verb = "applying" if mode == "apply" else "removing" + lines = apply_lines(asn) if mode == "apply" else remove_lines(asn) + print(f"\n=== {name} ({ip}) AS{asn} -- {verb} GoBGP peering ===") + try: + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(ip, username=user, password=pwd, timeout=15, + look_for_keys=False, allow_agent=False) + shell = ssh.invoke_shell(width=220, height=2000) + time.sleep(2) + shell.recv(65535) + CFG = "(config)#" + + shell.send("terminal length 0\n") + _drain(shell, 0.4, 5) + shell.send("configure terminal\n") + out = _drain(shell, 0.4, 15, until=CFG) + if CFG not in out: + print(f" FAIL: could not enter config mode\n {out[-200:]}") + ssh.close() + return False + + for line in lines: + shell.send(line + "\n") + time.sleep(0.4) + _drain(shell, 0.3, 8, until=CFG) + + shell.send("show configuration\n") + cand = _drain(shell, 0.3, 10, until=CFG) + if GOBGP_IP not in cand and "GOBGP-FEED-PASS" not in cand: + print(f" OK: no changes ({mode} already in effect)") + shell.send("abort\n") + _drain(shell, 0.5, 5) + ssh.close() + return True + + shell.send("commit\n") + result = _drain(shell, 0.3, 25, until=CFG) + if "fail" in result.lower() or "error" in result.lower(): + print(f" FAIL: commit error\n {result[-300:]}") + shell.send("abort\n") + _drain(shell, 0.5, 5) + ssh.close() + return False + + shell.send("end\n") + _drain(shell, 1.0, 8) + if mode == "apply": + shell.send(f"show bgp ipv4 unicast neighbors {GOBGP_IP} | include BGP state\n") + verify = _drain(shell, 1.0, 12) + state = next((l.strip() for l in verify.splitlines() + if "BGP state" in l), "(state not yet reported)") + print(f" committed. {state}") + else: + shell.send(f"show running-config router bgp | include {GOBGP_IP}\n") + verify = _drain(shell, 1.0, 12) + gone = GOBGP_IP not in verify.replace(f"include {GOBGP_IP}", "") + print(f" committed. neighbor removed: {gone}") + ssh.close() + return True + except Exception as e: + print(f" FAIL: {e}") + return False + + +def main(): + args = [a for a in sys.argv[1:]] + mode = "apply" + if "--remove" in args: + mode = "remove" + args.remove("--remove") + target = args[0].lower() if args else None + + if mode == "remove": + print("ROLLBACK: removing GoBGP peering from the core routers.") + results = {} + for name, ip, user, pwd, asn in CORES: + if target and target not in name.lower(): + continue + results[name] = configure_core(name, ip, user, pwd, asn, mode) + + print(f"\n{'='*48}\n SUMMARY ({mode})") + for name, ok in results.items(): + print(f" {name:22s} {'OK' if ok else 'FAILED'}") + if mode == "apply": + print("\nNext: restart GoBGP to load the new neighbors:") + print(" docker compose up -d gobgp") + else: + print("\nGoBGP container config still lists the cores; that is inert") + print("with the neighbors removed. To fully revert, also restore the") + print("previous gobgp/gobgpd.conf and run: docker compose up -d gobgp") + sys.exit(0 if all(results.values()) else 1) + + +if __name__ == "__main__": + main() diff --git a/docker-compose.yml b/docker-compose.yml index 874c888..3763a64 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -484,6 +484,42 @@ services: - EVPN_TOPIC=openbmp.parsed.evpn - PG_DSN=host=obmp-psql port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp} + # Per-router BGP policy-diff collector. Pulls post-policy accepted/advertised + # prefix counts and route-policy bindings from the IOS-XR routers over CLI + + # NETCONF (BMP on XRv9000 24.3.1 only carries pre-policy Adj-RIB-In). Feeds + # the Policy Diff dashboard. Host networking: it must reach the lab + # management network (10.100.0.x) and the published Postgres port. + rib-poller: + restart: unless-stopped + container_name: obmp-rib-poller + build: + context: ./obmp-rib-poller + network_mode: host + depends_on: + - psql + environment: + - PG_DSN=host=10.40.40.202 port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp} + - POLL_INTERVAL=900 + - ROUTER_USER=webui + - ROUTER_PASS=cisco + + # Samples Kafka consumer-group lag into PostgreSQL every 30s for the Kafka + # Lag dashboard -- visibility into the ingestion path under load (e.g. a + # full-table BGP convergence storm) and a sanity check when scaling psql-app. + kafka-lag-monitor: + restart: unless-stopped + container_name: obmp-kafka-lag-monitor + build: + context: ./kafka-lag-monitor + depends_on: + - kafka + - psql + environment: + - KAFKA_BROKER=obmp-kafka:29092 + - PG_DSN=host=obmp-psql port=5432 dbname=openbmp user=openbmp password=${POSTGRES_PASSWORD:-openbmp} + - LAG_POLL_INTERVAL=30 + - CONSUMER_GROUPS=obmp-psql-consumer,evpn-psql + whois: restart: unless-stopped container_name: obmp-whois diff --git a/gobgp/README.md b/gobgp/README.md index 88b1b54..87cd475 100644 --- a/gobgp/README.md +++ b/gobgp/README.md @@ -60,6 +60,69 @@ GoBGP reports using its `router-id` (`10.40.40.250`) and `local-as` (`65001`): To find it in Grafana/SQL, filter on `peer_as = 57355` or the router-id above. +## Fleet-wide full-table feed into the CML lab (stress test) + +GoBGP additionally re-advertises the full table to the two CML core routers +(CORE-01/CORE-02, AS65020). As route reflectors the cores propagate it to all +seven R9K clients, so every lab router carries and BMP-exports a full table — +an intentional stress test of the OpenBMP ingestion/storage path (the database +grows toward ~55-65 GB). + +- **GoBGP side** — `gobgpd.conf` neighbors `10.100.0.100` / `10.100.0.200` + (peer-as 65020, eBGP-multihop, IPv4+IPv6, `prefix-limit` caps). The + route-server sessions carry `default-export-policy = "reject-route"` so the + lab's own routes can never leak back to AS57355. +- **Router side** — `cml/gobgp_peering_config.py` adds the `neighbor + 10.40.40.202` config (with `maximum-prefix 1.5M`/`400k` caps) to both cores. + GoBGP is host-networked, so it sources BGP TCP from the host IP + `10.40.40.202`, not its router-id `10.40.40.250` — the cores peer with the + host IP. + +### Apply + +```sh +python3 cml/gobgp_peering_config.py # configure both cores +docker compose up -d --force-recreate gobgp # load gobgpd.conf changes +``` + +> A volume-mounted config change does NOT trigger a recreate on its own — +> `--force-recreate` is required for GoBGP to re-read `gobgpd.conf`. + +### Rollback + +**Emergency stop** (fastest — feed off within seconds, no router change): + +```sh +docker compose stop gobgp +``` + +Stopping GoBGP drops the eBGP sessions; the cores withdraw the full table and +the withdrawal propagates to every client. The `ip_rib` rows are marked +withdrawn and aged out by the existing TimescaleDB retention. + +**Full revert** (also removes the router-side config): + +```sh +python3 cml/gobgp_peering_config.py --remove # delete neighbor from cores +docker compose stop gobgp +``` + +To keep the Bromirski feed running but drop only the lab injection, delete the +two `10.100.0.x` `[[neighbors]]` blocks from `gobgpd.conf` and +`docker compose up -d --force-recreate gobgp`. + +### What to watch during convergence + +```sh +docker exec obmp-gobgp gobgp neighbor # 4 sessions Establ +docker logs --tail 20 obmp-psql-app # consumer lag +docker exec obmp-psql psql -U openbmp -d openbmp -c \ + "SELECT count(*) FROM ip_rib WHERE iswithdrawn = false;" # row growth +``` + +If `psql-app` consumer lag climbs without draining, or PostgreSQL CPU/IO +saturates, use the emergency stop above. + ## MRT fallback AS57355 is a **single volunteer-run host with no SLA** — it can and does go diff --git a/gobgp/gobgpd.conf b/gobgp/gobgpd.conf index d5db2d7..6b92a31 100644 --- a/gobgp/gobgpd.conf +++ b/gobgp/gobgpd.conf @@ -5,20 +5,30 @@ # received route to the OpenBMP collector, where it lands in PostgreSQL ip_rib. # Peering spec: https://lukasz.bromirski.net/post/bgp-w-labie-3/ # -# Receive-only: we announce NOTHING -- AS57355 explicitly asks peers not to -# send prefixes. Local AS is 65001 (the value the route server expects). -# Per the spec: eBGP multihop, no password, keepalive 3600 / hold-time 7200. +# It ALSO re-advertises the full table to the two CML core routers +# (CORE-01/CORE-02, AS65020) over eBGP. As route reflectors the cores +# propagate it to every R9K client -- so all 9 lab routers carry and +# BMP-export a full table. This is an intentional lab stress test of the +# OpenBMP ingestion/storage path (~9x full feeds; DB grows to ~55-65 GB). +# +# Local AS is 65001 (the value the Bromirski route server expects). +# Bromirski peering: eBGP multihop, no password, keepalive 3600 / hold 7200. # TOML syntax targets GoBGP v3.x / v4.x. [global] [global.config] as = 65001 router-id = "10.40.40.250" - # We only originate outbound sessions to the route server; disable the - # inbound BGP listener (port -1) so the daemon needs no privileged - # (<1024) bind -- required under docker network_mode: host. + # We only originate outbound sessions (to the route server and to the + # two cores) so the inbound BGP listener stays disabled (port -1) -- no + # privileged (<1024) bind needed under docker network_mode: host. port = -1 +# Note: once we peer with the cores, GoBGP learns the cores' lab routes over +# eBGP. To guarantee none of that leaks back to AS57355 (which asks peers to +# announce NOTHING), the route-server sessions below carry an apply-policy +# with default-export-policy = "reject-route" -- every export is dropped. + # --- Neighbor: route server, IPv4 feed -------------------------------------- # The IPv4 transport session carries the full IPv4 table only. [[neighbors]] @@ -35,6 +45,9 @@ [neighbors.transport.config] # we initiate the session; no local-address pinning passive-mode = false + [neighbors.apply-policy.config] + # reject every export toward the route server + default-export-policy = "reject-route" [[neighbors.afi-safis]] [neighbors.afi-safis.config] afi-safi-name = "ipv4-unicast" @@ -54,10 +67,94 @@ multihop-ttl = 64 [neighbors.transport.config] passive-mode = false + [neighbors.apply-policy.config] + # reject every export toward the route server + default-export-policy = "reject-route" [[neighbors.afi-safis]] [neighbors.afi-safis.config] afi-safi-name = "ipv6-unicast" +# --- Neighbor: CML CORE-01 (AS65020) ---------------------------------------- +# GoBGP initiates outbound to the core's mgmt IP (reachable from the docker +# host -- the cores already reach the host for BMP). GoBGP sources the session +# from the host IP 10.40.40.202. eBGP multihop: the host is several hops from +# the core. Default export policy (accept) re-advertises the full Bromirski +# table to the core. prefix-limit is a safety cap on what the core can send +# back (its lab routes only -- small). +# IPv4-unicast only: the cores have no global IPv6 address, so an ipv6 AF +# would hold the session Idle. IPv6 full-table feed is a separate phase. +[[neighbors]] + [neighbors.config] + neighbor-address = "10.100.0.100" + peer-as = 65020 + description = "CML CORE-01 -- full-table injection (lab stress test)" + [neighbors.ebgp-multihop.config] + enabled = true + multihop-ttl = 64 + [neighbors.transport.config] + passive-mode = false + [[neighbors.afi-safis]] + [neighbors.afi-safis.config] + afi-safi-name = "ipv4-unicast" + [neighbors.afi-safis.prefix-limit.config] + max-prefixes = 2000000 + shutdown-threshold-pct = 90 + +# --- Neighbor: CML CORE-02 (AS65020) ---------------------------------------- +[[neighbors]] + [neighbors.config] + neighbor-address = "10.100.0.200" + peer-as = 65020 + description = "CML CORE-02 -- full-table injection (lab stress test)" + [neighbors.ebgp-multihop.config] + enabled = true + multihop-ttl = 64 + [neighbors.transport.config] + passive-mode = false + [[neighbors.afi-safis]] + [neighbors.afi-safis.config] + afi-safi-name = "ipv4-unicast" + [neighbors.afi-safis.prefix-limit.config] + max-prefixes = 2000000 + shutdown-threshold-pct = 90 + +# --- Neighbor: PROX CORE-01 (AS65021) --------------------------------------- +# Second lab. Same IPv4-unicast-only full-table injection as the CML cores. +[[neighbors]] + [neighbors.config] + neighbor-address = "10.100.1.100" + peer-as = 65021 + description = "PROX CORE-01 -- full-table injection (lab stress test)" + [neighbors.ebgp-multihop.config] + enabled = true + multihop-ttl = 64 + [neighbors.transport.config] + passive-mode = false + [[neighbors.afi-safis]] + [neighbors.afi-safis.config] + afi-safi-name = "ipv4-unicast" + [neighbors.afi-safis.prefix-limit.config] + max-prefixes = 2000000 + shutdown-threshold-pct = 90 + +# --- Neighbor: PROX CORE-02 (AS65021) --------------------------------------- +[[neighbors]] + [neighbors.config] + neighbor-address = "10.100.1.200" + peer-as = 65021 + description = "PROX CORE-02 -- full-table injection (lab stress test)" + [neighbors.ebgp-multihop.config] + enabled = true + multihop-ttl = 64 + [neighbors.transport.config] + passive-mode = false + [[neighbors.afi-safis]] + [neighbors.afi-safis.config] + afi-safi-name = "ipv4-unicast" + [neighbors.afi-safis.prefix-limit.config] + max-prefixes = 2000000 + shutdown-threshold-pct = 90 + # --- BMP export to the OpenBMP collector ------------------------------------ # GoBGP connects OUT to the collector. GoBGP's BMP config requires a literal # IP (it cannot resolve a hostname), so we target the docker host IP where the diff --git a/kafka-lag-monitor/Dockerfile b/kafka-lag-monitor/Dockerfile new file mode 100644 index 0000000..96fb4ef --- /dev/null +++ b/kafka-lag-monitor/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.12-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY monitor.py . + +CMD ["python", "-u", "monitor.py"] diff --git a/kafka-lag-monitor/monitor.py b/kafka-lag-monitor/monitor.py new file mode 100644 index 0000000..d35ed24 --- /dev/null +++ b/kafka-lag-monitor/monitor.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""kafka-lag-monitor -- samples Kafka consumer-group lag into PostgreSQL. + +Every LAG_POLL_INTERVAL seconds it records, per (group, topic, partition), the +committed offset, log-end offset and lag, plus the group's active member +count. The Kafka Lag dashboard reads kafka_consumer_lag / kafka_consumer_members +(postgres/scripts/009_kafka_lag.sql) so the ingestion path can be sanity- +checked -- watch lag spike during a BGP convergence storm and drain again, and +confirm the member count when psql-app is scaled out. +""" +import os +import time +import traceback + +import psycopg2 +from confluent_kafka import Consumer, TopicPartition, ConsumerGroupTopicPartitions +from confluent_kafka.admin import AdminClient + +BROKER = os.environ.get("KAFKA_BROKER", "obmp-kafka:29092") +PG_DSN = os.environ.get( + "PG_DSN", + "host=obmp-psql port=5432 dbname=openbmp user=openbmp password=openbmp", +) +INTERVAL = int(os.environ.get("LAG_POLL_INTERVAL", "30")) +GROUPS = [g.strip() for g in os.environ.get( + "CONSUMER_GROUPS", "obmp-psql-consumer,evpn-psql").split(",") if g.strip()] + + +def sample_group(admin, consumer, group): + """Return (lag_rows, member_count) for one consumer group. + + lag_rows: [(group, topic, partition, committed, log_end, lag), ...] + """ + futs = admin.list_consumer_group_offsets( + [ConsumerGroupTopicPartitions(group)]) + offsets = futs[group].result(timeout=30) + rows = [] + for tp in offsets.topic_partitions: + committed = tp.offset if (tp.offset is not None and tp.offset >= 0) else 0 + try: + _, log_end = consumer.get_watermark_offsets( + TopicPartition(tp.topic, tp.partition), timeout=10, cached=False) + except Exception: + continue + rows.append((group, tp.topic, tp.partition, committed, log_end, + max(log_end - committed, 0))) + + members = None + try: + desc = admin.describe_consumer_groups([group])[group].result(timeout=30) + members = len(desc.members) + except Exception: + pass + return rows, members + + +def main(): + print(f"kafka-lag-monitor starting; broker={BROKER}, groups={GROUPS}, " + f"interval={INTERVAL}s", flush=True) + admin = AdminClient({"bootstrap.servers": BROKER}) + consumer = Consumer({"bootstrap.servers": BROKER, + "group.id": "kafka-lag-monitor-probe", + "enable.auto.commit": False}) + while True: + start = time.time() + try: + conn = psycopg2.connect(PG_DSN) + with conn.cursor() as cur: + for group in GROUPS: + try: + rows, members = sample_group(admin, consumer, group) + except Exception as e: + print(f" [{group}] sample failed: {e}", flush=True) + continue + for r in rows: + cur.execute( + "INSERT INTO kafka_consumer_lag (group_id, topic, " + "partition, committed, log_end, lag) " + "VALUES (%s,%s,%s,%s,%s,%s)", r) + if members is not None: + cur.execute( + "INSERT INTO kafka_consumer_members (group_id, " + "members) VALUES (%s,%s)", (group, members)) + total = sum(r[5] for r in rows) + print(f" [{group}] {len(rows)} partitions, " + f"total lag={total}, members={members}", flush=True) + conn.commit() + conn.close() + except Exception as e: + print(f"cycle failed: {e}", flush=True) + traceback.print_exc() + time.sleep(max(0, INTERVAL - (time.time() - start))) + + +if __name__ == "__main__": + main() diff --git a/kafka-lag-monitor/requirements.txt b/kafka-lag-monitor/requirements.txt new file mode 100644 index 0000000..6c0eb7b --- /dev/null +++ b/kafka-lag-monitor/requirements.txt @@ -0,0 +1,2 @@ +confluent-kafka==2.5.3 +psycopg2-binary==2.9.9 diff --git a/obmp-grafana/dashboards/Telemetry-3001/kafka_lag.json b/obmp-grafana/dashboards/Telemetry-3001/kafka_lag.json new file mode 100644 index 0000000..be4cac1 --- /dev/null +++ b/obmp-grafana/dashboards/Telemetry-3001/kafka_lag.json @@ -0,0 +1,112 @@ +{ + "annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]}, + "description": "Kafka consumer-group lag for the OpenBMP ingestion path, sampled every 30s by the kafka-lag-monitor service. Use it to sanity-check ingestion under load: lag spikes during a BGP convergence storm and should drain back to ~0; the consumer member count rises when psql-app is scaled out.", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [{"asDropdown": true,"icon": "external link","includeVars": true,"keepTime": true,"tags": ["obmp-nav"],"title": "OBMP Dashboards","type": "dashboards"}], + "liveNow": false, + "panels": [ + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Total lag across all partitions at the latest sample.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 50000},{"color": "red","value": 1000000}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 0,"y": 0}, + "id": 1, + "options": {"colorMode": "background","graphMode": "area","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT sum(lag) AS \"Total Lag\" FROM kafka_consumer_lag WHERE group_id = '$group' AND ts = (SELECT max(ts) FROM kafka_consumer_lag WHERE group_id = '$group')","refId": "A"}], + "title": "Current Total Lag","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Active consumer members in the group at the latest sample. Rises when psql-app is scaled out.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "blue","value": null}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 6,"y": 0}, + "id": 2, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT members AS \"Consumers\" FROM kafka_consumer_members WHERE group_id = '$group' ORDER BY ts DESC LIMIT 1","refId": "A"}], + "title": "Active Consumers","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Topic-partitions tracked for the group at the latest sample.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "purple","value": null}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 12,"y": 0}, + "id": 3, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(*) AS \"Partitions\" FROM kafka_consumer_lag WHERE group_id = '$group' AND ts = (SELECT max(ts) FROM kafka_consumer_lag WHERE group_id = '$group')","refId": "A"}], + "title": "Partitions Monitored","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Highest total lag observed in the selected time range.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 50000},{"color": "red","value": 1000000}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 18,"y": 0}, + "id": 4, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT max(t.total) AS \"Peak Lag\" FROM (SELECT ts, sum(lag) AS total FROM kafka_consumer_lag WHERE group_id = '$group' AND $__timeFilter(ts) GROUP BY ts) t","refId": "A"}], + "title": "Peak Lag (range)","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Total consumer lag over time. A healthy ingestion path returns to near-zero after a burst; sustained growth means consumers cannot keep up.", + "fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 25,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true},"unit": "short"},"overrides": []}, + "gridPos": {"h": 9,"w": 12,"x": 0,"y": 4}, + "id": 5, + "options": {"legend": {"calcs": ["max","last"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, sum(lag) AS \"Total lag\" FROM kafka_consumer_lag WHERE group_id = '$group' AND $__timeFilter(ts) GROUP BY ts ORDER BY ts","refId": "A"}], + "title": "Total Consumer Lag","type": "timeseries" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Active consumer members over time. Step changes correspond to psql-app scale events or rebalances.", + "fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 15,"lineInterpolation": "stepAfter","lineWidth": 2,"showPoints": "never","spanNulls": true},"unit": "short"},"overrides": []}, + "gridPos": {"h": 9,"w": 12,"x": 12,"y": 4}, + "id": 6, + "options": {"legend": {"calcs": ["min","max"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "single","sort": "none"}}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, members AS \"Consumers\" FROM kafka_consumer_members WHERE group_id = '$group' AND $__timeFilter(ts) ORDER BY ts","refId": "A"}], + "title": "Consumer Members","type": "timeseries" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Lag broken down by topic. unicast_prefix and base_attribute carry the BGP route churn and dominate during a convergence storm.", + "fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 20,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true,"stacking": {"group": "A","mode": "normal"}},"unit": "short"},"overrides": []}, + "gridPos": {"h": 9,"w": 12,"x": 0,"y": 13}, + "id": 7, + "options": {"legend": {"calcs": ["last"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, topic AS metric, sum(lag) AS lag FROM kafka_consumer_lag WHERE group_id = '$group' AND $__timeFilter(ts) GROUP BY ts, topic ORDER BY ts","refId": "A"}], + "title": "Lag by Topic","type": "timeseries" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Per-partition lag for openbmp.parsed.unicast_prefix. A single deep partition that lags while others stay flat indicates a hot partition (skewed message keying) -- adding consumers gives it a dedicated thread but cannot split it.", + "fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"axisPlacement": "auto","drawStyle": "line","fillOpacity": 10,"lineInterpolation": "smooth","lineWidth": 1,"showPoints": "never","spanNulls": true},"unit": "short"},"overrides": []}, + "gridPos": {"h": 9,"w": 12,"x": 12,"y": 13}, + "id": 8, + "options": {"legend": {"calcs": ["max","last"],"displayMode": "table","placement": "bottom","showLegend": true},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "time_series","rawSql": "SELECT ts AS time, 'p' || partition AS metric, lag FROM kafka_consumer_lag WHERE group_id = '$group' AND topic = 'openbmp.parsed.unicast_prefix' AND $__timeFilter(ts) ORDER BY ts","refId": "A"}], + "title": "Lag by Partition (unicast_prefix)","type": "timeseries" + } + ], + "refresh": "30s", + "schemaVersion": 36, + "style": "dark", + "tags": ["obmp", "obmp-nav", "telemetry", "kafka"], + "templating": { + "list": [ + {"name": "group","type": "query","label": "Consumer Group","datasource": {"type": "postgres","uid": "obmp_postgres"},"query": "SELECT DISTINCT group_id FROM kafka_consumer_members ORDER BY 1","definition": "SELECT DISTINCT group_id FROM kafka_consumer_members ORDER BY 1","refresh": 1,"includeAll": false,"multi": false,"current": {"selected": true,"text": "obmp-psql-consumer","value": "obmp-psql-consumer"},"options": [],"sort": 1,"hide": 0} + ] + }, + "time": {"from": "now-3h","to": "now"}, + "timepicker": {}, + "timezone": "", + "title": "Kafka Ingestion Lag", + "uid": "kafka-lag", + "version": 1, + "weekStart": "" +} diff --git a/obmp-grafana/dashboards/obmp/Base-1001/peer_detail.json b/obmp-grafana/dashboards/obmp/Base-1001/peer_detail.json index 70891f1..ed53525 100644 --- a/obmp-grafana/dashboards/obmp/Base-1001/peer_detail.json +++ b/obmp-grafana/dashboards/obmp/Base-1001/peer_detail.json @@ -1,6 +1,6 @@ { "annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]}, - "description": "Per-peer drilldown — BGP session identity, state history, prefix counts, update/withdraw rate, recent events and negotiated capabilities for a single BGP peer.", + "description": "Per-peer drilldown — BGP session identity, state history, prefix counts, update/withdraw rate, recent events and negotiated capabilities for a single BGP session. The selector is router-qualified ('router -> peer'): prefix counts are routes RECEIVED from the selected peer (Adj-RIB-In). In a route-reflector mesh pick 'client -> core-loopback' to see a client's full received table; 'core -> client-loopback' shows only the client's originated routes.", "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 1, @@ -175,14 +175,14 @@ { "current": {}, "datasource": {"type": "postgres","uid": "obmp_postgres"}, - "definition": "select peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0", + "definition": "select routername || ' -> ' || peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0 order by 1", "hide": 0, "includeAll": false, - "label": "Peer", + "label": "Router -> Peer", "multi": false, "name": "peer_hash", "options": [], - "query": "select peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0", + "query": "select routername || ' -> ' || peername as __text, peer_hash_id as __value from v_peers where length(peername) > 0 order by 1", "refresh": 1, "regex": "", "skipUrlSync": false, diff --git a/obmp-grafana/dashboards/obmp/History-1002/policy_diff.json b/obmp-grafana/dashboards/obmp/History-1002/policy_diff.json new file mode 100644 index 0000000..0342c2f --- /dev/null +++ b/obmp-grafana/dashboards/obmp/History-1002/policy_diff.json @@ -0,0 +1,102 @@ +{ + "annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]}, + "description": "Per-router BGP policy diff: routes RECEIVED (BMP pre-policy Adj-RIB-In) vs KEPT (accepted into the BGP table, polled from the router) vs REJECTED, plus ADVERTISED counts and the bound inbound/outbound route-policy names. Kept/advertised data is collected by the obmp-rib-poller service over CLI+NETCONF because BMP on XRv9000 24.3.1 only carries pre-policy Adj-RIB-In. NOTE: Rejected = Received - Kept is everything BGP did not accept; inbound route-policy is one cause, alongside RR originator-id/cluster-list loop detection, AS-path loops and unreachable next-hops. Per-policy attribution is a future phase.", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [{"asDropdown": true,"icon": "external link","includeVars": true,"keepTime": true,"tags": ["obmp-nav"],"title": "OBMP Dashboards","type": "dashboards"}], + "liveNow": false, + "panels": [ + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Routers with poller data in the selected scope.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "blue","value": null}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 0,"y": 0}, + "id": 1, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(DISTINCT rs.router_hash_id) AS \"Routers\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id WHERE r.name IN ($router)","refId": "A"}], + "title": "Routers Polled","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Neighbor address-families tracked (one row per router/neighbor/AF).", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "purple","value": null}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 6,"y": 0}, + "id": 2, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(*) AS \"Neighbor AFs\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id WHERE r.name IN ($router)","refId": "A"}], + "title": "Neighbor AFs Tracked","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Total prefixes received (BMP) minus kept (polled), summed where positive, across BMP-monitored neighbors. Includes inbound route-policy denies AND BGP loop/validation rejections (RR originator-id, AS-path loop, next-hop) -- not policy-only.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "orange","value": 1}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 12,"y": 0}, + "id": 3, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT COALESCE(sum(GREATEST(COALESCE(rcv.received,0) - COALESCE(rs.accepted_count,0), 0)),0) AS \"Rejected\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id LEFT JOIN (SELECT bp.router_hash_id, bp.peer_addr, CASE WHEN ir.isipv4 THEN 'ipv4' ELSE 'ipv6' END AS afi, count(*) AS received FROM ip_rib ir JOIN bgp_peers bp ON bp.hash_id = ir.peer_hash_id WHERE ir.iswithdrawn = false GROUP BY bp.router_hash_id, bp.peer_addr, afi) rcv ON rcv.router_hash_id = rs.router_hash_id AND rcv.peer_addr = rs.peer_addr AND rcv.afi = rs.afi WHERE r.name IN ($router)","refId": "A"}], + "title": "Total Rejected (Recv - Kept)","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Distinct route-policies (RPL) stored from the routers.", + "fieldConfig": {"defaults": {"color": {"mode": "thresholds"},"unit": "short","thresholds": {"mode": "absolute","steps": [{"color": "blue","value": null}]}},"overrides": []}, + "gridPos": {"h": 4,"w": 6,"x": 18,"y": 0}, + "id": 4, + "options": {"colorMode": "value","graphMode": "none","justifyMode": "auto","orientation": "auto","reduceOptions": {"calcs": ["lastNotNull"],"fields": "","values": false},"textMode": "auto"}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT count(*) AS \"Policies\" FROM route_policies rp JOIN routers r ON r.hash_id = rp.router_hash_id WHERE r.name IN ($router)","refId": "A"}], + "title": "Route Policies Stored","type": "stat" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Per neighbor address-family: Received = BMP pre-policy Adj-RIB-In count (blank if the neighbor is not BMP-monitored). Kept = prefixes accepted into the BGP table (polled). Rejected = Received - Kept (inbound route-policy denies plus BGP loop/validation rejections). Advertised = adj-rib-out size toward the neighbor. In/Out Policy = the bound route-policy names.", + "fieldConfig": {"defaults": {"custom": {"align": "auto","displayMode": "auto"}},"overrides": [{"matcher": {"id": "byName","options": "Rejected"},"properties": [{"id": "custom.displayMode","value": "color-text"},{"id": "thresholds","value": {"mode": "absolute","steps": [{"color": "text","value": null},{"color": "orange","value": 1}]}}]}]}, + "gridPos": {"h": 12,"w": 24,"x": 0,"y": 4}, + "id": 5, + "options": {"showHeader": true,"sortBy": [{"desc": true,"displayName": "Rejected"}]}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT r.name AS \"Router\", host(rs.peer_addr) AS \"Neighbor\", rs.peer_as AS \"Peer AS\", rs.afi AS \"AF\", rs.session_state AS \"State\", rcv.received AS \"Received (BMP)\", rs.accepted_count AS \"Kept\", rcv.received - rs.accepted_count AS \"Rejected\", rs.advertised_count AS \"Advertised\", bin.policy_name AS \"In-Policy\", bout.policy_name AS \"Out-Policy\", rs.polled_at AS \"Polled\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id LEFT JOIN neighbor_policy_bind bin ON bin.router_hash_id = rs.router_hash_id AND bin.peer_addr = rs.peer_addr AND bin.afi = rs.afi AND bin.direction = 'in' LEFT JOIN neighbor_policy_bind bout ON bout.router_hash_id = rs.router_hash_id AND bout.peer_addr = rs.peer_addr AND bout.afi = rs.afi AND bout.direction = 'out' LEFT JOIN (SELECT bp.router_hash_id, bp.peer_addr, CASE WHEN ir.isipv4 THEN 'ipv4' ELSE 'ipv6' END AS afi, count(*) AS received FROM ip_rib ir JOIN bgp_peers bp ON bp.hash_id = ir.peer_hash_id WHERE ir.iswithdrawn = false GROUP BY bp.router_hash_id, bp.peer_addr, afi) rcv ON rcv.router_hash_id = rs.router_hash_id AND rcv.peer_addr = rs.peer_addr AND rcv.afi = rs.afi WHERE r.name IN ($router) ORDER BY r.name, rs.peer_addr, rs.afi","refId": "A"}], + "title": "Per-Neighbor Policy Diff","type": "table" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Prefixes received (BMP) but not accepted into the BGP table, by router.", + "fieldConfig": {"defaults": {"color": {"mode": "palette-classic"},"custom": {"lineWidth": 1,"fillOpacity": 80,"axisPlacement": "auto"}},"overrides": []}, + "gridPos": {"h": 8,"w": 12,"x": 0,"y": 16}, + "id": 6, + "options": {"orientation": "horizontal","showValue": "auto","xField": "Router","legend": {"showLegend": false},"tooltip": {"mode": "single"}}, + "pluginVersion": "9.1.7", + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT r.name AS \"Router\", sum(GREATEST(COALESCE(rcv.received,0) - COALESCE(rs.accepted_count,0), 0)) AS \"Rejected\" FROM router_rib_stats rs JOIN routers r ON r.hash_id = rs.router_hash_id LEFT JOIN (SELECT bp.router_hash_id, bp.peer_addr, CASE WHEN ir.isipv4 THEN 'ipv4' ELSE 'ipv6' END AS afi, count(*) AS received FROM ip_rib ir JOIN bgp_peers bp ON bp.hash_id = ir.peer_hash_id WHERE ir.iswithdrawn = false GROUP BY bp.router_hash_id, bp.peer_addr, afi) rcv ON rcv.router_hash_id = rs.router_hash_id AND rcv.peer_addr = rs.peer_addr AND rcv.afi = rs.afi WHERE r.name IN ($router) GROUP BY r.name ORDER BY r.name","refId": "A"}], + "title": "Rejected by Router","type": "barchart" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Full route-policy (RPL) bodies retrieved from the routers via NETCONF. The body is what the heuristic attribution engine would parse in a later phase.", + "fieldConfig": {"defaults": {"custom": {"align": "auto","displayMode": "auto"}},"overrides": []}, + "gridPos": {"h": 8,"w": 12,"x": 12,"y": 16}, + "id": 7, + "options": {"showHeader": true,"sortBy": [{"desc": false,"displayName": "Router"}]}, + "targets": [{"datasource": {"type": "postgres","uid": "obmp_postgres"},"format": "table","rawSql": "SELECT r.name AS \"Router\", rp.policy_name AS \"Policy\", rp.body AS \"RPL Body\", rp.retrieved_at AS \"Retrieved\" FROM route_policies rp JOIN routers r ON r.hash_id = rp.router_hash_id WHERE r.name IN ($router) ORDER BY r.name, rp.policy_name","refId": "A"}], + "title": "Route Policies (RPL)","type": "table" + } + ], + "schemaVersion": 36, + "style": "dark", + "tags": ["obmp", "obmp-nav", "bgp", "policy"], + "templating": { + "list": [ + {"name": "router","type": "query","label": "Router","datasource": {"type": "postgres","uid": "obmp_postgres"},"query": "SELECT name FROM routers ORDER BY name","definition": "SELECT name FROM routers ORDER BY name","refresh": 1,"includeAll": true,"multi": true,"current": {"selected": true,"text": ["All"],"value": ["$__all"]},"options": [],"sort": 1,"hide": 0} + ] + }, + "time": {"from": "now-6h","to": "now"}, + "timepicker": {}, + "timezone": "", + "title": "Policy Diff", + "uid": "policy-diff", + "version": 1, + "weekStart": "" +} diff --git a/obmp-rib-poller/Dockerfile b/obmp-rib-poller/Dockerfile new file mode 100644 index 0000000..220eba5 --- /dev/null +++ b/obmp-rib-poller/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.12-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY poller.py . + +CMD ["python", "-u", "poller.py"] diff --git a/obmp-rib-poller/poller.py b/obmp-rib-poller/poller.py new file mode 100644 index 0000000..8c06d0b --- /dev/null +++ b/obmp-rib-poller/poller.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +"""obmp-rib-poller -- per-router BGP policy-diff collector. + +BMP on the lab's XRv9000 24.3.1 routers only carries pre-policy Adj-RIB-In: +`route-monitoring inbound post-policy` replaces (does not supplement) the +pre-policy export and is not flagged distinctly, and adj-rib-out BMP is +unsupported on that image. So the "routes kept after inbound policy" and +"routes advertised" datasets are pulled directly from the routers: + + * CLI (SSH / paramiko) + - `show bgp unicast summary` per-neighbor accepted + prefix count + state + - `show bgp unicast neighbors + advertised-count` adj-rib-out size + - `show running-config router bgp` inbound/outbound + route-policy bindings + * NETCONF (ncclient) + - Cisco-IOS-XR-policy-repository-cfg full route-policy bodies + +Results land in route_policies / neighbor_policy_bind / router_rib_stats +(postgres/scripts/008_obmp_policy_diff.sql). The Policy Diff dashboard joins +router_rib_stats against BMP ip_rib (received, pre-policy) to show received +vs kept vs discarded vs advertised, with the bound policy names. + +gNMI was evaluated but is not used: gRPC dial-in is unreachable on the R9K +routers (only the cores answer on 57400). CLI + NETCONF work on every router. +""" +import os +import re +import time +import xml.etree.ElementTree as ET + +import paramiko +import psycopg2 +from ncclient import manager + +PG_DSN = os.environ.get( + "PG_DSN", + "host=10.40.40.202 port=5432 dbname=openbmp user=openbmp password=openbmp", +) +POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "900")) +R_USER = os.environ.get("ROUTER_USER", "webui") +R_PASS = os.environ.get("ROUTER_PASS", "cisco") + +# Router inventory: (name, mgmt_ip, ssh_user, ssh_pass) for both labs. +# CML uses webui/cisco (the ROUTER_USER/ROUTER_PASS env defaults); the PROX +# cores only accept admin/cisco, which also works on the PROX clients, so +# admin/cisco is used for the whole PROX lab. +ROUTERS = [ + ("CML-CORE-01", "10.100.0.100", R_USER, R_PASS), + ("CML-CORE-02", "10.100.0.200", R_USER, R_PASS), + ("CML-R9K-01", "10.100.0.1", R_USER, R_PASS), + ("CML-R9K-02", "10.100.0.2", R_USER, R_PASS), + ("CML-R9K-03", "10.100.0.3", R_USER, R_PASS), + ("CML-R9K-04", "10.100.0.4", R_USER, R_PASS), + ("CML-R9K-05", "10.100.0.5", R_USER, R_PASS), + ("CML-R9K-06", "10.100.0.6", R_USER, R_PASS), + ("CML-R9K-07", "10.100.0.7", R_USER, R_PASS), + ("PROX-CORE-01", "10.100.1.100", "admin", "cisco"), + ("PROX-CORE-02", "10.100.1.200", "admin", "cisco"), + ("PROX-R9K-01", "10.100.1.1", "admin", "cisco"), + ("PROX-R9K-02", "10.100.1.2", "admin", "cisco"), + ("PROX-R9K-03", "10.100.1.3", "admin", "cisco"), + ("PROX-R9K-04", "10.100.1.4", "admin", "cisco"), + ("PROX-R9K-05", "10.100.1.5", "admin", "cisco"), + ("PROX-R9K-06", "10.100.1.6", "admin", "cisco"), + ("PROX-R9K-07", "10.100.1.7", "admin", "cisco"), +] + + +# -------------------------------------------------------------------------- +# Router I/O +# -------------------------------------------------------------------------- +def ssh_run(ip, cmds, user, pwd): + """Open one interactive shell, run each command, return {cmd: output}.""" + cli = paramiko.SSHClient() + cli.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + cli.connect(ip, username=user, password=pwd, timeout=20, + look_for_keys=False, allow_agent=False) + try: + sh = cli.invoke_shell(width=240, height=8000) + time.sleep(2) + if sh.recv_ready(): + sh.recv(65535) + sh.send("terminal length 0\n") + time.sleep(1) + if sh.recv_ready(): + sh.recv(65535) + out = {} + for cmd in cmds: + sh.send(cmd + "\n") + buf, start = "", time.time() + while time.time() - start < 90: + time.sleep(0.5) + if sh.recv_ready(): + buf += sh.recv(65535).decode(errors="replace") + if buf.rstrip().endswith("#"): + break + out[cmd] = buf + return out + finally: + cli.close() + + +def fetch_policies(ip, user, pwd): + """Return {policy_name: rpl_body} via NETCONF policy-repository-cfg.""" + filt = ("subtree", + '' + '') + with manager.connect(host=ip, port=830, username=user, password=pwd, + hostkey_verify=False, device_params={"name": "iosxr"}, + allow_agent=False, look_for_keys=False, + timeout=40) as m: + reply = m.get_config(source="running", filter=filt) + root = ET.fromstring(str(reply)) + policies = {} + for el in root.iter(): + if not el.tag.endswith("}route-policy") and el.tag != "route-policy": + continue + name = body = None + for child in el: + if child.tag.endswith("route-policy-name"): + name = (child.text or "").strip() + elif child.tag.endswith("rpl-route-policy"): + body = child.text or "" + if name: + policies[name] = body + return policies + + +# -------------------------------------------------------------------------- +# Parsers +# -------------------------------------------------------------------------- +def parse_summary(text): + """Return (router_id, {neighbor: (peer_as, state, accepted_count|None)}).""" + router_id = None + m = re.search(r"BGP router identifier ([0-9A-Fa-f:.]+)", text) + if m: + router_id = m.group(1) + neighbors = {} + in_table = False + for line in text.splitlines(): + if line.startswith("Neighbor") and "St/PfxRcd" in line: + in_table = True + continue + if not in_table: + continue + toks = line.split() + if len(toks) < 10: + continue + ip = toks[0] + if ("." not in ip and ":" not in ip) or not re.match(r"^[0-9A-Fa-f:.]+$", ip): + continue + try: + peer_as = int(toks[2]) + except ValueError: + continue + last = toks[-1] + if last.isdigit(): + state, accepted = "Established", int(last) + else: + state, accepted = last, None + neighbors[ip] = (peer_as, state, accepted) + return router_id, neighbors + + +def parse_bindings(text): + """Return [(neighbor, afi, direction, policy_name)] from router-bgp config.""" + binds = [] + cur_nbr, cur_af = None, None + for raw in text.splitlines(): + line = raw.strip() + m = re.match(r"neighbor (\S+)$", line) + if m: + cur_nbr, cur_af = m.group(1), None + continue + m = re.match(r"address-family (ipv4|ipv6) unicast$", line) + if m: + cur_af = m.group(1) + continue + m = re.match(r"route-policy (\S+) (in|out)$", line) + if m and cur_nbr and cur_af: + binds.append((cur_nbr, cur_af, m.group(2), m.group(1))) + return binds + + +def parse_adv_count(text): + m = re.search(r"prefixes Advertised:\s*(\d+)", text) + return int(m.group(1)) if m else None + + +# -------------------------------------------------------------------------- +# Database +# -------------------------------------------------------------------------- +def resolve_router_hash(cur, mgmt_ip, name): + """Match the BMP routers row. ip_address (management IP) is the reliable + key: bgp_id is unpopulated and the BMP-reported names vary in case.""" + cur.execute("SELECT hash_id FROM routers WHERE ip_address = %s", (mgmt_ip,)) + row = cur.fetchone() + if row: + return row[0] + cur.execute("SELECT hash_id FROM routers WHERE lower(name) = lower(%s)", (name,)) + row = cur.fetchone() + return row[0] if row else None + + +def poll_router(conn, name, ip, user, pwd): + out = ssh_run(ip, [ + "show bgp ipv4 unicast summary", + "show bgp ipv6 unicast summary", + "show running-config router bgp", + ], user, pwd) + rid4, nbr4 = parse_summary(out["show bgp ipv4 unicast summary"]) + rid6, nbr6 = parse_summary(out["show bgp ipv6 unicast summary"]) + router_id = rid4 or rid6 + binds = parse_bindings(out["show running-config router bgp"]) + + adv_cmds = [] + for nb in nbr4: + adv_cmds.append(("ipv4", nb, + f"show bgp ipv4 unicast neighbors {nb} advertised-count")) + for nb in nbr6: + adv_cmds.append(("ipv6", nb, + f"show bgp ipv6 unicast neighbors {nb} advertised-count")) + adv = {} + if adv_cmds: + adv_out = ssh_run(ip, [c for _, _, c in adv_cmds], user, pwd) + for afi, nb, cmd in adv_cmds: + adv[(afi, nb)] = parse_adv_count(adv_out.get(cmd, "")) + + policies = {} + try: + policies = fetch_policies(ip, user, pwd) + except Exception as e: + print(f" [{name}] NETCONF policy fetch failed: {e}", flush=True) + + with conn.cursor() as cur: + rh = resolve_router_hash(cur, ip, name) + if not rh: + print(f" [{name}] no matching BMP router (mgmt={ip}, " + f"router-id={router_id}); skipping DB write", flush=True) + conn.rollback() + return + cur.execute("DELETE FROM router_rib_stats WHERE router_hash_id=%s", (rh,)) + for afi, nbrs in (("ipv4", nbr4), ("ipv6", nbr6)): + for nb, (peer_as, state, accepted) in nbrs.items(): + cur.execute( + "INSERT INTO router_rib_stats (router_hash_id, peer_addr, " + "afi, peer_as, session_state, accepted_count, " + "advertised_count, polled_at) " + "VALUES (%s,%s,%s,%s,%s,%s,%s, now())", + (rh, nb, afi, peer_as, state, accepted, adv.get((afi, nb)))) + cur.execute("DELETE FROM neighbor_policy_bind WHERE router_hash_id=%s", (rh,)) + for nb, afi, direction, pname in binds: + cur.execute( + "INSERT INTO neighbor_policy_bind (router_hash_id, peer_addr, " + "afi, direction, policy_name, retrieved_at) " + "VALUES (%s,%s,%s,%s,%s, now()) ON CONFLICT DO NOTHING", + (rh, nb, afi, direction, pname)) + cur.execute("DELETE FROM route_policies WHERE router_hash_id=%s", (rh,)) + for pname, body in policies.items(): + cur.execute( + "INSERT INTO route_policies (router_hash_id, policy_name, body, " + "retrieved_at) VALUES (%s,%s,%s, now())", + (rh, pname, body)) + conn.commit() + print(f" [{name}] ok: {len(nbr4)} v4 + {len(nbr6)} v6 neighbors, " + f"{len(binds)} policy bindings, {len(policies)} policies", flush=True) + + +def main(): + print(f"obmp-rib-poller starting; interval={POLL_INTERVAL}s, " + f"{len(ROUTERS)} routers", flush=True) + while True: + cycle_start = time.time() + try: + conn = psycopg2.connect(PG_DSN) + except Exception as e: + print(f"DB connect failed: {e}; retry in 30s", flush=True) + time.sleep(30) + continue + for name, ip, user, pwd in ROUTERS: + try: + poll_router(conn, name, ip, user, pwd) + except Exception as e: + conn.rollback() + print(f" [{name}] poll failed: {e}", flush=True) + conn.close() + elapsed = time.time() - cycle_start + sleep_for = max(0, POLL_INTERVAL - elapsed) + print(f"cycle done in {elapsed:.0f}s; sleeping {sleep_for:.0f}s", flush=True) + time.sleep(sleep_for) + + +if __name__ == "__main__": + main() diff --git a/obmp-rib-poller/requirements.txt b/obmp-rib-poller/requirements.txt new file mode 100644 index 0000000..794d0da --- /dev/null +++ b/obmp-rib-poller/requirements.txt @@ -0,0 +1,4 @@ +paramiko==3.5.0 +ncclient==0.6.15 +psycopg2-binary==2.9.9 +lxml==5.3.0 diff --git a/postgres/scripts/008_obmp_policy_diff.sql b/postgres/scripts/008_obmp_policy_diff.sql new file mode 100644 index 0000000..610b495 --- /dev/null +++ b/postgres/scripts/008_obmp_policy_diff.sql @@ -0,0 +1,53 @@ +-- 008_obmp_policy_diff.sql +-- Policy Diff feature -- per-router routing-policy retrieval plus per-neighbor +-- received / kept / advertised counts, populated by the obmp-rib-poller +-- service (obmp-rib-poller/poller.py). +-- +-- Why a poller and not BMP: on the lab's XRv9000 24.3.1 routers BMP only +-- carries pre-policy Adj-RIB-In. `route-monitoring inbound post-policy` +-- replaces (does not supplement) the pre-policy export and is not flagged +-- distinctly, and adj-rib-out BMP (RFC 8671) is unsupported on that image. +-- So "kept after inbound policy" and "advertised" are pulled directly from +-- the routers over CLI + NETCONF. +-- +-- The Policy Diff dashboard joins router_rib_stats against BMP ip_rib +-- (received, pre-policy) to show received vs kept vs discarded vs advertised. + +-- Full route-policy (RPL) bodies, one row per policy per router. +CREATE TABLE IF NOT EXISTS route_policies ( + router_hash_id uuid NOT NULL, + policy_name varchar(256) NOT NULL, + body text, + retrieved_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (router_hash_id, policy_name) +); + +-- Which route-policy is bound inbound/outbound on each neighbor address-family. +CREATE TABLE IF NOT EXISTS neighbor_policy_bind ( + router_hash_id uuid NOT NULL, + peer_addr inet NOT NULL, + afi varchar(8) NOT NULL, -- ipv4 / ipv6 + direction varchar(4) NOT NULL, -- in / out + policy_name varchar(256), + retrieved_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (router_hash_id, peer_addr, afi, direction) +); + +-- Per-neighbor RIB sizes pulled from the router: accepted_count is the +-- post-inbound-policy prefix count (BGP summary PfxRcd), advertised_count is +-- the adj-rib-out size toward that neighbor. +CREATE TABLE IF NOT EXISTS router_rib_stats ( + router_hash_id uuid NOT NULL, + peer_addr inet NOT NULL, + afi varchar(8) NOT NULL, + peer_as bigint, + session_state varchar(32), + accepted_count integer, + advertised_count integer, + polled_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (router_hash_id, peer_addr, afi) +); + +CREATE INDEX IF NOT EXISTS idx_route_policies_router ON route_policies (router_hash_id); +CREATE INDEX IF NOT EXISTS idx_neighbor_policy_bind_router ON neighbor_policy_bind (router_hash_id); +CREATE INDEX IF NOT EXISTS idx_router_rib_stats_router ON router_rib_stats (router_hash_id); diff --git a/postgres/scripts/009_kafka_lag.sql b/postgres/scripts/009_kafka_lag.sql new file mode 100644 index 0000000..b179186 --- /dev/null +++ b/postgres/scripts/009_kafka_lag.sql @@ -0,0 +1,39 @@ +-- 009_kafka_lag.sql +-- Kafka consumer-group lag history for the OpenBMP ingestion path, written by +-- the kafka-lag-monitor service every ~30s. Backs the Kafka Lag dashboard so +-- the ingestion path can be sanity-checked: watch lag spike during a BGP +-- convergence storm and drain again, and confirm the consumer member count +-- when psql-app is scaled out. + +CREATE TABLE IF NOT EXISTS kafka_consumer_lag ( + ts timestamptz NOT NULL DEFAULT now(), + group_id varchar(128) NOT NULL, + topic varchar(200) NOT NULL, + partition integer NOT NULL, + committed bigint, + log_end bigint, + lag bigint +); + +CREATE TABLE IF NOT EXISTS kafka_consumer_members ( + ts timestamptz NOT NULL DEFAULT now(), + group_id varchar(128) NOT NULL, + members integer +); + +SELECT create_hypertable('kafka_consumer_lag', 'ts', if_not_exists => TRUE); +SELECT create_hypertable('kafka_consumer_members', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_kafka_lag_group_ts + ON kafka_consumer_lag (group_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_kafka_members_group_ts + ON kafka_consumer_members (group_id, ts DESC); + +-- Bound history growth (~250k rows/day across ~88 partitions at 30s). Wrapped +-- so the script still succeeds if the TimescaleDB job scheduler is unavailable. +DO $$ BEGIN + PERFORM add_retention_policy('kafka_consumer_lag', INTERVAL '14 days'); + PERFORM add_retention_policy('kafka_consumer_members', INTERVAL '14 days'); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'kafka lag retention policy not added: %', SQLERRM; +END $$;