#!/usr/bin/env python3 """obmp-rib-poller -- per-router BGP policy-diff collector. BMP on the lab's XRv9000 24.3.1 routers only carries pre-policy Adj-RIB-In: `route-monitoring inbound post-policy` replaces (does not supplement) the pre-policy export and is not flagged distinctly, and adj-rib-out BMP is unsupported on that image. So the "routes kept after inbound policy" and "routes advertised" datasets are pulled directly from the routers: * CLI (SSH / paramiko) - `show bgp unicast summary` per-neighbor accepted prefix count + state - `show bgp unicast neighbors advertised-count` adj-rib-out size - `show running-config router bgp` inbound/outbound route-policy bindings * NETCONF (ncclient) - Cisco-IOS-XR-policy-repository-cfg full route-policy bodies Results land in route_policies / neighbor_policy_bind / router_rib_stats (postgres/scripts/008_obmp_policy_diff.sql). The Policy Diff dashboard joins router_rib_stats against BMP ip_rib (received, pre-policy) to show received vs kept vs discarded vs advertised, with the bound policy names. gNMI was evaluated but is not used: gRPC dial-in is unreachable on the R9K routers (only the cores answer on 57400). CLI + NETCONF work on every router. """ import os import re import time import xml.etree.ElementTree as ET import paramiko import psycopg2 from ncclient import manager PG_DSN = os.environ.get( "PG_DSN", "host=10.40.40.202 port=5432 dbname=openbmp user=openbmp password=openbmp", ) POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "900")) R_USER = os.environ.get("ROUTER_USER", "webui") R_PASS = os.environ.get("ROUTER_PASS", "cisco") # Router inventory: (name, mgmt_ip, ssh_user, ssh_pass) for both labs. # CML uses webui/cisco (the ROUTER_USER/ROUTER_PASS env defaults); the PROX # cores only accept admin/cisco, which also works on the PROX clients, so # admin/cisco is used for the whole PROX lab. ROUTERS = [ ("CML-CORE-01", "10.100.0.100", R_USER, R_PASS), ("CML-CORE-02", "10.100.0.200", R_USER, R_PASS), ("CML-R9K-01", "10.100.0.1", R_USER, R_PASS), ("CML-R9K-02", "10.100.0.2", R_USER, R_PASS), ("CML-R9K-03", "10.100.0.3", R_USER, R_PASS), ("CML-R9K-04", "10.100.0.4", R_USER, R_PASS), ("CML-R9K-05", "10.100.0.5", R_USER, R_PASS), ("CML-R9K-06", "10.100.0.6", R_USER, R_PASS), ("CML-R9K-07", "10.100.0.7", R_USER, R_PASS), ("PROX-CORE-01", "10.100.1.100", "admin", "cisco"), ("PROX-CORE-02", "10.100.1.200", "admin", "cisco"), ("PROX-R9K-01", "10.100.1.1", "admin", "cisco"), ("PROX-R9K-02", "10.100.1.2", "admin", "cisco"), ("PROX-R9K-03", "10.100.1.3", "admin", "cisco"), ("PROX-R9K-04", "10.100.1.4", "admin", "cisco"), ("PROX-R9K-05", "10.100.1.5", "admin", "cisco"), ("PROX-R9K-06", "10.100.1.6", "admin", "cisco"), ("PROX-R9K-07", "10.100.1.7", "admin", "cisco"), ] # -------------------------------------------------------------------------- # Router I/O # -------------------------------------------------------------------------- def ssh_run(ip, cmds, user, pwd): """Open one interactive shell, run each command, return {cmd: output}.""" cli = paramiko.SSHClient() cli.set_missing_host_key_policy(paramiko.AutoAddPolicy()) cli.connect(ip, username=user, password=pwd, timeout=20, look_for_keys=False, allow_agent=False) try: sh = cli.invoke_shell(width=240, height=8000) time.sleep(2) if sh.recv_ready(): sh.recv(65535) sh.send("terminal length 0\n") time.sleep(1) if sh.recv_ready(): sh.recv(65535) out = {} for cmd in cmds: sh.send(cmd + "\n") buf, start = "", time.time() while time.time() - start < 90: time.sleep(0.5) if sh.recv_ready(): buf += sh.recv(65535).decode(errors="replace") if buf.rstrip().endswith("#"): break out[cmd] = buf return out finally: cli.close() def fetch_policies(ip, user, pwd): """Return {policy_name: rpl_body} via NETCONF policy-repository-cfg.""" filt = ("subtree", '' '') with manager.connect(host=ip, port=830, username=user, password=pwd, hostkey_verify=False, device_params={"name": "iosxr"}, allow_agent=False, look_for_keys=False, timeout=40) as m: reply = m.get_config(source="running", filter=filt) root = ET.fromstring(str(reply)) policies = {} for el in root.iter(): if not el.tag.endswith("}route-policy") and el.tag != "route-policy": continue name = body = None for child in el: if child.tag.endswith("route-policy-name"): name = (child.text or "").strip() elif child.tag.endswith("rpl-route-policy"): body = child.text or "" if name: policies[name] = body return policies # -------------------------------------------------------------------------- # Parsers # -------------------------------------------------------------------------- def parse_summary(text): """Return (router_id, {neighbor: (peer_as, state, accepted_count|None)}).""" router_id = None m = re.search(r"BGP router identifier ([0-9A-Fa-f:.]+)", text) if m: router_id = m.group(1) neighbors = {} in_table = False for line in text.splitlines(): if line.startswith("Neighbor") and "St/PfxRcd" in line: in_table = True continue if not in_table: continue toks = line.split() if len(toks) < 10: continue ip = toks[0] if ("." not in ip and ":" not in ip) or not re.match(r"^[0-9A-Fa-f:.]+$", ip): continue try: peer_as = int(toks[2]) except ValueError: continue last = toks[-1] if last.isdigit(): state, accepted = "Established", int(last) else: state, accepted = last, None neighbors[ip] = (peer_as, state, accepted) return router_id, neighbors def parse_bindings(text): """Return [(neighbor, afi, direction, policy_name)] from router-bgp config.""" binds = [] cur_nbr, cur_af = None, None for raw in text.splitlines(): line = raw.strip() m = re.match(r"neighbor (\S+)$", line) if m: cur_nbr, cur_af = m.group(1), None continue m = re.match(r"address-family (ipv4|ipv6) unicast$", line) if m: cur_af = m.group(1) continue m = re.match(r"route-policy (\S+) (in|out)$", line) if m and cur_nbr and cur_af: binds.append((cur_nbr, cur_af, m.group(2), m.group(1))) return binds def parse_adv_count(text): m = re.search(r"prefixes Advertised:\s*(\d+)", text) return int(m.group(1)) if m else None # -------------------------------------------------------------------------- # Database # -------------------------------------------------------------------------- def resolve_router_hash(cur, mgmt_ip, name): """Match the BMP routers row. ip_address (management IP) is the reliable key: bgp_id is unpopulated and the BMP-reported names vary in case.""" cur.execute("SELECT hash_id FROM routers WHERE ip_address = %s", (mgmt_ip,)) row = cur.fetchone() if row: return row[0] cur.execute("SELECT hash_id FROM routers WHERE lower(name) = lower(%s)", (name,)) row = cur.fetchone() return row[0] if row else None def poll_router(conn, name, ip, user, pwd): out = ssh_run(ip, [ "show bgp ipv4 unicast summary", "show bgp ipv6 unicast summary", "show running-config router bgp", ], user, pwd) rid4, nbr4 = parse_summary(out["show bgp ipv4 unicast summary"]) rid6, nbr6 = parse_summary(out["show bgp ipv6 unicast summary"]) router_id = rid4 or rid6 binds = parse_bindings(out["show running-config router bgp"]) adv_cmds = [] for nb in nbr4: adv_cmds.append(("ipv4", nb, f"show bgp ipv4 unicast neighbors {nb} advertised-count")) for nb in nbr6: adv_cmds.append(("ipv6", nb, f"show bgp ipv6 unicast neighbors {nb} advertised-count")) adv = {} if adv_cmds: adv_out = ssh_run(ip, [c for _, _, c in adv_cmds], user, pwd) for afi, nb, cmd in adv_cmds: adv[(afi, nb)] = parse_adv_count(adv_out.get(cmd, "")) policies = {} try: policies = fetch_policies(ip, user, pwd) except Exception as e: print(f" [{name}] NETCONF policy fetch failed: {e}", flush=True) with conn.cursor() as cur: rh = resolve_router_hash(cur, ip, name) if not rh: print(f" [{name}] no matching BMP router (mgmt={ip}, " f"router-id={router_id}); skipping DB write", flush=True) conn.rollback() return cur.execute("DELETE FROM router_rib_stats WHERE router_hash_id=%s", (rh,)) for afi, nbrs in (("ipv4", nbr4), ("ipv6", nbr6)): for nb, (peer_as, state, accepted) in nbrs.items(): cur.execute( "INSERT INTO router_rib_stats (router_hash_id, peer_addr, " "afi, peer_as, session_state, accepted_count, " "advertised_count, polled_at) " "VALUES (%s,%s,%s,%s,%s,%s,%s, now())", (rh, nb, afi, peer_as, state, accepted, adv.get((afi, nb)))) cur.execute("DELETE FROM neighbor_policy_bind WHERE router_hash_id=%s", (rh,)) for nb, afi, direction, pname in binds: cur.execute( "INSERT INTO neighbor_policy_bind (router_hash_id, peer_addr, " "afi, direction, policy_name, retrieved_at) " "VALUES (%s,%s,%s,%s,%s, now()) ON CONFLICT DO NOTHING", (rh, nb, afi, direction, pname)) cur.execute("DELETE FROM route_policies WHERE router_hash_id=%s", (rh,)) for pname, body in policies.items(): cur.execute( "INSERT INTO route_policies (router_hash_id, policy_name, body, " "retrieved_at) VALUES (%s,%s,%s, now())", (rh, pname, body)) conn.commit() print(f" [{name}] ok: {len(nbr4)} v4 + {len(nbr6)} v6 neighbors, " f"{len(binds)} policy bindings, {len(policies)} policies", flush=True) def main(): print(f"obmp-rib-poller starting; interval={POLL_INTERVAL}s, " f"{len(ROUTERS)} routers", flush=True) while True: cycle_start = time.time() try: conn = psycopg2.connect(PG_DSN) except Exception as e: print(f"DB connect failed: {e}; retry in 30s", flush=True) time.sleep(30) continue for name, ip, user, pwd in ROUTERS: try: poll_router(conn, name, ip, user, pwd) except Exception as e: conn.rollback() print(f" [{name}] poll failed: {e}", flush=True) conn.close() elapsed = time.time() - cycle_start sleep_for = max(0, POLL_INTERVAL - elapsed) print(f"cycle done in {elapsed:.0f}s; sleeping {sleep_for:.0f}s", flush=True) time.sleep(sleep_for) if __name__ == "__main__": main()