299 lines
12 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""obmp-rib-poller -- per-router BGP policy-diff collector.
BMP on the lab's XRv9000 24.3.1 routers only carries pre-policy Adj-RIB-In:
`route-monitoring inbound post-policy` replaces (does not supplement) the
pre-policy export and is not flagged distinctly, and adj-rib-out BMP is
unsupported on that image. So the "routes kept after inbound policy" and
"routes advertised" datasets are pulled directly from the routers:
* CLI (SSH / paramiko)
- `show bgp <af> unicast summary` per-neighbor accepted
prefix count + state
- `show bgp <af> unicast neighbors <n>
advertised-count` adj-rib-out size
- `show running-config router bgp` inbound/outbound
route-policy bindings
* NETCONF (ncclient)
- Cisco-IOS-XR-policy-repository-cfg full route-policy bodies
Results land in route_policies / neighbor_policy_bind / router_rib_stats
(postgres/scripts/008_obmp_policy_diff.sql). The Policy Diff dashboard joins
router_rib_stats against BMP ip_rib (received, pre-policy) to show received
vs kept vs discarded vs advertised, with the bound policy names.
gNMI was evaluated but is not used: gRPC dial-in is unreachable on the R9K
routers (only the cores answer on 57400). CLI + NETCONF work on every router.
"""
import os
import re
import time
import xml.etree.ElementTree as ET
import paramiko
import psycopg2
from ncclient import manager
PG_DSN = os.environ.get(
"PG_DSN",
Parameterize HOST_IP everywhere -- portable to another lab host Removes hardcoded 10.40.40.202 references so a fresh clone + .env-only edit can stand the stack up on a new compute node. * docker-compose.yml: rib-poller PG_DSN now uses ${HOST_IP:-...}. * obmp-rib-poller/poller.py: default PG_DSN host falls back to ${HOST_IP} env (compose passes it; manual runs honour $HOST_IP too). * cml/gobgp_peering_config.py: GOBGP_IP read from $HOST_IP or the HOST_IP= line in repo-root .env, with a small _env_default helper. * cml/proxmox_bmp_config.py: COLLECTOR_HOST resolved the same way. For gobgp/gobgpd.conf and gobgp-evpn/gobgpd.conf -- jauderho/gobgp is distroless (no shell), so we can't sed-substitute at container start. Pattern instead: * gobgpd.conf is now gobgpd.conf.tmpl with __HOST_IP__ placeholders (committed). The rendered gobgpd.conf is gitignored. * setup.sh renders the .tmpl(s) to .conf using $HOST_IP from .env. * compose `command` stays the simple `gobgpd -f /config/gobgpd.conf`. After cloning on a new host: cp .env.example .env -> edit HOST_IP -> ./setup.sh -> docker compose up -d. Verified locally by force-recreating gobgp; all 6 sessions (4 cores + 2 Bromirski) re-established in <60s. Known portability gaps still to address (separate work): * Hardcoded lab-router inventories in cml/*.py and obmp-rib-poller/poller.py. * The /etc/cron.d/openbmp */5 -> */15 edit inside obmp-psql-app is not persistent (regenerated by config_cron on every container start). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-19 18:34:51 -07:00
f"host={os.environ.get('HOST_IP', 'localhost')} port=5432 "
"dbname=openbmp user=openbmp password=openbmp",
)
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "900"))
R_USER = os.environ.get("ROUTER_USER", "webui")
R_PASS = os.environ.get("ROUTER_PASS", "cisco")
# Router inventory: (name, mgmt_ip, ssh_user, ssh_pass) for both labs.
# CML uses webui/cisco (the ROUTER_USER/ROUTER_PASS env defaults); the PROX
# cores only accept admin/cisco, which also works on the PROX clients, so
# admin/cisco is used for the whole PROX lab.
ROUTERS = [
("CML-CORE-01", "10.100.0.100", R_USER, R_PASS),
("CML-CORE-02", "10.100.0.200", R_USER, R_PASS),
("CML-R9K-01", "10.100.0.1", R_USER, R_PASS),
("CML-R9K-02", "10.100.0.2", R_USER, R_PASS),
("CML-R9K-03", "10.100.0.3", R_USER, R_PASS),
("CML-R9K-04", "10.100.0.4", R_USER, R_PASS),
("CML-R9K-05", "10.100.0.5", R_USER, R_PASS),
("CML-R9K-06", "10.100.0.6", R_USER, R_PASS),
("CML-R9K-07", "10.100.0.7", R_USER, R_PASS),
("PROX-CORE-01", "10.100.1.100", "admin", "cisco"),
("PROX-CORE-02", "10.100.1.200", "admin", "cisco"),
("PROX-R9K-01", "10.100.1.1", "admin", "cisco"),
("PROX-R9K-02", "10.100.1.2", "admin", "cisco"),
("PROX-R9K-03", "10.100.1.3", "admin", "cisco"),
("PROX-R9K-04", "10.100.1.4", "admin", "cisco"),
("PROX-R9K-05", "10.100.1.5", "admin", "cisco"),
("PROX-R9K-06", "10.100.1.6", "admin", "cisco"),
("PROX-R9K-07", "10.100.1.7", "admin", "cisco"),
]
# --------------------------------------------------------------------------
# Router I/O
# --------------------------------------------------------------------------
def ssh_run(ip, cmds, user, pwd):
"""Open one interactive shell, run each command, return {cmd: output}."""
cli = paramiko.SSHClient()
cli.set_missing_host_key_policy(paramiko.AutoAddPolicy())
cli.connect(ip, username=user, password=pwd, timeout=20,
look_for_keys=False, allow_agent=False)
try:
sh = cli.invoke_shell(width=240, height=8000)
time.sleep(2)
if sh.recv_ready():
sh.recv(65535)
sh.send("terminal length 0\n")
time.sleep(1)
if sh.recv_ready():
sh.recv(65535)
out = {}
for cmd in cmds:
sh.send(cmd + "\n")
buf, start = "", time.time()
while time.time() - start < 90:
time.sleep(0.5)
if sh.recv_ready():
buf += sh.recv(65535).decode(errors="replace")
if buf.rstrip().endswith("#"):
break
out[cmd] = buf
return out
finally:
cli.close()
def fetch_policies(ip, user, pwd):
"""Return {policy_name: rpl_body} via NETCONF policy-repository-cfg."""
filt = ("subtree",
'<routing-policy xmlns="http://cisco.com/ns/yang/'
'Cisco-IOS-XR-policy-repository-cfg"><route-policies/>'
'</routing-policy>')
with manager.connect(host=ip, port=830, username=user, password=pwd,
hostkey_verify=False, device_params={"name": "iosxr"},
allow_agent=False, look_for_keys=False,
timeout=40) as m:
reply = m.get_config(source="running", filter=filt)
root = ET.fromstring(str(reply))
policies = {}
for el in root.iter():
if not el.tag.endswith("}route-policy") and el.tag != "route-policy":
continue
name = body = None
for child in el:
if child.tag.endswith("route-policy-name"):
name = (child.text or "").strip()
elif child.tag.endswith("rpl-route-policy"):
body = child.text or ""
if name:
policies[name] = body
return policies
# --------------------------------------------------------------------------
# Parsers
# --------------------------------------------------------------------------
def parse_summary(text):
"""Return (router_id, {neighbor: (peer_as, state, accepted_count|None)})."""
router_id = None
m = re.search(r"BGP router identifier ([0-9A-Fa-f:.]+)", text)
if m:
router_id = m.group(1)
neighbors = {}
in_table = False
for line in text.splitlines():
if line.startswith("Neighbor") and "St/PfxRcd" in line:
in_table = True
continue
if not in_table:
continue
toks = line.split()
if len(toks) < 10:
continue
ip = toks[0]
if ("." not in ip and ":" not in ip) or not re.match(r"^[0-9A-Fa-f:.]+$", ip):
continue
try:
peer_as = int(toks[2])
except ValueError:
continue
last = toks[-1]
if last.isdigit():
state, accepted = "Established", int(last)
else:
state, accepted = last, None
neighbors[ip] = (peer_as, state, accepted)
return router_id, neighbors
def parse_bindings(text):
"""Return [(neighbor, afi, direction, policy_name)] from router-bgp config."""
binds = []
cur_nbr, cur_af = None, None
for raw in text.splitlines():
line = raw.strip()
m = re.match(r"neighbor (\S+)$", line)
if m:
cur_nbr, cur_af = m.group(1), None
continue
m = re.match(r"address-family (ipv4|ipv6) unicast$", line)
if m:
cur_af = m.group(1)
continue
m = re.match(r"route-policy (\S+) (in|out)$", line)
if m and cur_nbr and cur_af:
binds.append((cur_nbr, cur_af, m.group(2), m.group(1)))
return binds
def parse_adv_count(text):
m = re.search(r"prefixes Advertised:\s*(\d+)", text)
return int(m.group(1)) if m else None
# --------------------------------------------------------------------------
# Database
# --------------------------------------------------------------------------
def resolve_router_hash(cur, mgmt_ip, name):
"""Match the BMP routers row. ip_address (management IP) is the reliable
key: bgp_id is unpopulated and the BMP-reported names vary in case."""
cur.execute("SELECT hash_id FROM routers WHERE ip_address = %s", (mgmt_ip,))
row = cur.fetchone()
if row:
return row[0]
cur.execute("SELECT hash_id FROM routers WHERE lower(name) = lower(%s)", (name,))
row = cur.fetchone()
return row[0] if row else None
def poll_router(conn, name, ip, user, pwd):
out = ssh_run(ip, [
"show bgp ipv4 unicast summary",
"show bgp ipv6 unicast summary",
"show running-config router bgp",
], user, pwd)
rid4, nbr4 = parse_summary(out["show bgp ipv4 unicast summary"])
rid6, nbr6 = parse_summary(out["show bgp ipv6 unicast summary"])
router_id = rid4 or rid6
binds = parse_bindings(out["show running-config router bgp"])
adv_cmds = []
for nb in nbr4:
adv_cmds.append(("ipv4", nb,
f"show bgp ipv4 unicast neighbors {nb} advertised-count"))
for nb in nbr6:
adv_cmds.append(("ipv6", nb,
f"show bgp ipv6 unicast neighbors {nb} advertised-count"))
adv = {}
if adv_cmds:
adv_out = ssh_run(ip, [c for _, _, c in adv_cmds], user, pwd)
for afi, nb, cmd in adv_cmds:
adv[(afi, nb)] = parse_adv_count(adv_out.get(cmd, ""))
policies = {}
try:
policies = fetch_policies(ip, user, pwd)
except Exception as e:
print(f" [{name}] NETCONF policy fetch failed: {e}", flush=True)
with conn.cursor() as cur:
rh = resolve_router_hash(cur, ip, name)
if not rh:
print(f" [{name}] no matching BMP router (mgmt={ip}, "
f"router-id={router_id}); skipping DB write", flush=True)
conn.rollback()
return
cur.execute("DELETE FROM router_rib_stats WHERE router_hash_id=%s", (rh,))
for afi, nbrs in (("ipv4", nbr4), ("ipv6", nbr6)):
for nb, (peer_as, state, accepted) in nbrs.items():
cur.execute(
"INSERT INTO router_rib_stats (router_hash_id, peer_addr, "
"afi, peer_as, session_state, accepted_count, "
"advertised_count, polled_at) "
"VALUES (%s,%s,%s,%s,%s,%s,%s, now())",
(rh, nb, afi, peer_as, state, accepted, adv.get((afi, nb))))
cur.execute("DELETE FROM neighbor_policy_bind WHERE router_hash_id=%s", (rh,))
for nb, afi, direction, pname in binds:
cur.execute(
"INSERT INTO neighbor_policy_bind (router_hash_id, peer_addr, "
"afi, direction, policy_name, retrieved_at) "
"VALUES (%s,%s,%s,%s,%s, now()) ON CONFLICT DO NOTHING",
(rh, nb, afi, direction, pname))
cur.execute("DELETE FROM route_policies WHERE router_hash_id=%s", (rh,))
for pname, body in policies.items():
cur.execute(
"INSERT INTO route_policies (router_hash_id, policy_name, body, "
"retrieved_at) VALUES (%s,%s,%s, now())",
(rh, pname, body))
conn.commit()
print(f" [{name}] ok: {len(nbr4)} v4 + {len(nbr6)} v6 neighbors, "
f"{len(binds)} policy bindings, {len(policies)} policies", flush=True)
def main():
print(f"obmp-rib-poller starting; interval={POLL_INTERVAL}s, "
f"{len(ROUTERS)} routers", flush=True)
while True:
cycle_start = time.time()
try:
conn = psycopg2.connect(PG_DSN)
except Exception as e:
print(f"DB connect failed: {e}; retry in 30s", flush=True)
time.sleep(30)
continue
for name, ip, user, pwd in ROUTERS:
try:
poll_router(conn, name, ip, user, pwd)
except Exception as e:
conn.rollback()
print(f" [{name}] poll failed: {e}", flush=True)
conn.close()
elapsed = time.time() - cycle_start
sleep_for = max(0, POLL_INTERVAL - elapsed)
print(f"cycle done in {elapsed:.0f}s; sleeping {sleep_for:.0f}s", flush=True)
time.sleep(sleep_for)
if __name__ == "__main__":
main()