2026-03-06 10:48:44 -07:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
Cisco SNMP Walk Parser
|
|
|
|
|
|
|
|
|
|
Parses raw snmpbulkwalk -On -OQ output from a Cisco device and produces
|
|
|
|
|
a structured JSON file with interface details focused on a specific
|
|
|
|
|
remPortId (LLDP neighbor interface).
|
|
|
|
|
|
|
|
|
|
Usage:
|
|
|
|
|
python3 cisco-parse.py <walk_file.txt> <remPortId> [output.json]
|
|
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
python3 cisco-parse.py walks/172-16-50-4_walk.txt Te1/1/3
|
|
|
|
|
python3 cisco-parse.py walks/172-16-50-4_walk.txt Gi1/0/39 output.json
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Cisco short-name <-> long-name mapping
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
CISCO_SHORT_TO_LONG = {
|
2026-03-06 11:06:23 -07:00
|
|
|
# IOS-XE naming
|
2026-03-06 10:48:44 -07:00
|
|
|
"Te": "TenGigabitEthernet",
|
|
|
|
|
"Gi": "GigabitEthernet",
|
|
|
|
|
"Fa": "FastEthernet",
|
|
|
|
|
"Fo": "FortyGigabitEthernet",
|
2026-03-06 11:06:23 -07:00
|
|
|
"Tw": "TwentyFiveGigE",
|
|
|
|
|
"Twe": "TwentyFiveGigE",
|
2026-03-06 10:48:44 -07:00
|
|
|
"Eth": "Ethernet",
|
2026-03-06 11:06:23 -07:00
|
|
|
"Po": "Port-channel",
|
|
|
|
|
"Vl": "Vlan",
|
|
|
|
|
# IOS-XR naming (NCS 5500, ASR 9000, etc.)
|
|
|
|
|
"TenGigE": "TenGigE",
|
|
|
|
|
"HundredGigE": "HundredGigE",
|
|
|
|
|
"TwentyFiveGigE": "TwentyFiveGigE",
|
|
|
|
|
"FortyGigE": "FortyGigE",
|
|
|
|
|
"GigE": "GigE", # rare but exists on some XR platforms
|
|
|
|
|
"Bundle-Ether": "Bundle-Ether",
|
|
|
|
|
"BE": "Bundle-Ether",
|
|
|
|
|
# Common to both
|
|
|
|
|
"Hu": "HundredGigE",
|
2026-03-06 10:48:44 -07:00
|
|
|
"BDI": "BDI",
|
|
|
|
|
"BVI": "BVI",
|
|
|
|
|
"Lo": "Loopback",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Reverse map: long -> short (built from the canonical mapping)
|
|
|
|
|
CISCO_LONG_TO_SHORT = {}
|
|
|
|
|
for _short, _long in CISCO_SHORT_TO_LONG.items():
|
|
|
|
|
# If multiple shorts map to the same long, prefer the shorter abbreviation
|
|
|
|
|
if _long not in CISCO_LONG_TO_SHORT or len(_short) < len(CISCO_LONG_TO_SHORT[_long]):
|
|
|
|
|
CISCO_LONG_TO_SHORT[_long] = _short
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# OID constants
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
OID_SYS_DESCR = ".1.3.6.1.2.1.1.1.0"
|
|
|
|
|
OID_SYS_UPTIME = ".1.3.6.1.2.1.1.3.0"
|
|
|
|
|
OID_SYS_NAME = ".1.3.6.1.2.1.1.5.0"
|
|
|
|
|
|
|
|
|
|
# IF-MIB base prefixes (append .{ifIndex})
|
|
|
|
|
OID_IF_DESCR = ".1.3.6.1.2.1.2.2.1.2"
|
|
|
|
|
OID_IF_TYPE = ".1.3.6.1.2.1.2.2.1.3"
|
|
|
|
|
OID_IF_MTU = ".1.3.6.1.2.1.2.2.1.4"
|
|
|
|
|
OID_IF_SPEED = ".1.3.6.1.2.1.2.2.1.5"
|
|
|
|
|
OID_IF_ADMIN_STATUS = ".1.3.6.1.2.1.2.2.1.7"
|
|
|
|
|
OID_IF_OPER_STATUS = ".1.3.6.1.2.1.2.2.1.8"
|
|
|
|
|
OID_IF_IN_OCTETS = ".1.3.6.1.2.1.2.2.1.10"
|
|
|
|
|
OID_IF_IN_DISCARDS = ".1.3.6.1.2.1.2.2.1.13"
|
|
|
|
|
OID_IF_IN_ERRORS = ".1.3.6.1.2.1.2.2.1.14"
|
|
|
|
|
OID_IF_OUT_OCTETS = ".1.3.6.1.2.1.2.2.1.16"
|
|
|
|
|
OID_IF_OUT_DISCARDS = ".1.3.6.1.2.1.2.2.1.19"
|
|
|
|
|
OID_IF_OUT_ERRORS = ".1.3.6.1.2.1.2.2.1.20"
|
|
|
|
|
|
|
|
|
|
# IF-MIB ifXTable
|
|
|
|
|
OID_IF_NAME = ".1.3.6.1.2.1.31.1.1.1.1"
|
|
|
|
|
OID_IF_HC_IN_OCTETS = ".1.3.6.1.2.1.31.1.1.1.6"
|
|
|
|
|
OID_IF_HC_OUT_OCTETS = ".1.3.6.1.2.1.31.1.1.1.10"
|
|
|
|
|
OID_IF_HIGH_SPEED = ".1.3.6.1.2.1.31.1.1.1.15"
|
|
|
|
|
OID_IF_ALIAS = ".1.3.6.1.2.1.31.1.1.1.18"
|
|
|
|
|
|
|
|
|
|
# ifStackTable: .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower}
|
|
|
|
|
OID_IF_STACK_STATUS = ".1.3.6.1.2.1.31.1.2.1.3"
|
|
|
|
|
|
|
|
|
|
# ENTITY-MIB
|
|
|
|
|
OID_ENT_PHYS_PREFIX = ".1.3.6.1.2.1.47.1.1.1.1"
|
|
|
|
|
|
|
|
|
|
# CISCO-ENTITY-SENSOR-MIB
|
|
|
|
|
OID_CISCO_SENSOR_PREFIX = ".1.3.6.1.4.1.9.9.91.1.1.1.1"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Walk file parser
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def parse_walk_file(walk_file):
|
|
|
|
|
"""Parse an snmpbulkwalk -On -OQ output file into {oid: value} dict.
|
|
|
|
|
|
|
|
|
|
Lines look like:
|
|
|
|
|
.1.3.6.1.2.1.2.2.1.2.62 = "TenGigabitEthernet1/1/3"
|
|
|
|
|
.1.3.6.1.2.1.2.2.1.7.62 = 1
|
|
|
|
|
|
|
|
|
|
String values have surrounding quotes stripped.
|
|
|
|
|
"""
|
|
|
|
|
walk_path = Path(walk_file)
|
|
|
|
|
oid_data = {}
|
|
|
|
|
|
|
|
|
|
with walk_path.open("r", errors="replace") as fh:
|
|
|
|
|
pending_oid = None
|
|
|
|
|
pending_val = None
|
|
|
|
|
|
|
|
|
|
for raw_line in fh:
|
|
|
|
|
line = raw_line.rstrip("\n\r")
|
|
|
|
|
|
|
|
|
|
# Handle multi-line values (e.g., Cisco sysDescr spans multiple lines)
|
|
|
|
|
if pending_oid is not None:
|
|
|
|
|
# Continuation of a multi-line quoted value
|
|
|
|
|
pending_val += " " + line.strip()
|
|
|
|
|
if '"' in line:
|
|
|
|
|
# Closing quote found — finalize
|
|
|
|
|
val = pending_val.strip()
|
|
|
|
|
if val.startswith('"'):
|
|
|
|
|
val = val[1:]
|
|
|
|
|
if val.endswith('"'):
|
|
|
|
|
val = val[:-1]
|
|
|
|
|
oid_data[pending_oid] = val
|
|
|
|
|
pending_oid = None
|
|
|
|
|
pending_val = None
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
line = line.strip()
|
|
|
|
|
if not line or line.startswith("#"):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Split on first ' = '
|
|
|
|
|
parts = line.split(" = ", 1)
|
|
|
|
|
if len(parts) != 2:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
oid = parts[0].strip()
|
|
|
|
|
value = parts[1].strip()
|
|
|
|
|
|
|
|
|
|
# Check for opening quote without closing (multi-line value)
|
|
|
|
|
if value.startswith('"') and not value.endswith('"'):
|
|
|
|
|
pending_oid = oid
|
|
|
|
|
pending_val = value
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Strip surrounding quotes
|
|
|
|
|
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
|
|
|
|
|
value = value[1:-1]
|
|
|
|
|
|
|
|
|
|
oid_data[oid] = value
|
|
|
|
|
|
|
|
|
|
# Handle any trailing pending value
|
|
|
|
|
if pending_oid is not None:
|
|
|
|
|
val = pending_val.strip().strip('"')
|
|
|
|
|
oid_data[pending_oid] = val
|
|
|
|
|
|
|
|
|
|
return oid_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Helpers
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def _get(oid_data, oid, default=""):
|
|
|
|
|
"""Safely retrieve a value from oid_data."""
|
|
|
|
|
return oid_data.get(oid, default)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_subtree(oid_data, prefix):
|
|
|
|
|
"""Return all entries whose OID starts with prefix (with trailing dot)."""
|
|
|
|
|
dotted = prefix if prefix.endswith(".") else prefix + "."
|
|
|
|
|
return {k: v for k, v in oid_data.items() if k.startswith(dotted) or k == prefix}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _expand_short_name(short_name):
|
|
|
|
|
"""Expand a Cisco short interface name to full form.
|
|
|
|
|
|
|
|
|
|
'Te1/1/3' -> 'TenGigabitEthernet1/1/3'
|
|
|
|
|
'Gi1/0/39' -> 'GigabitEthernet1/0/39'
|
|
|
|
|
"""
|
|
|
|
|
# Try each prefix, longest first to avoid partial matches (e.g., Twe before Tw)
|
|
|
|
|
for short in sorted(CISCO_SHORT_TO_LONG.keys(), key=len, reverse=True):
|
|
|
|
|
if short_name.startswith(short):
|
|
|
|
|
remainder = short_name[len(short):]
|
|
|
|
|
# Verify the remainder starts with a digit or slash (real interface numbering)
|
|
|
|
|
if remainder and (remainder[0].isdigit() or remainder[0] == "/"):
|
|
|
|
|
return CISCO_SHORT_TO_LONG[short] + remainder
|
|
|
|
|
return short_name
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _shorten_name(long_name):
|
|
|
|
|
"""Shorten a Cisco full interface name to abbreviated form.
|
|
|
|
|
|
|
|
|
|
'TenGigabitEthernet1/1/3' -> 'Te1/1/3'
|
|
|
|
|
"""
|
|
|
|
|
for long, short in sorted(CISCO_LONG_TO_SHORT.items(), key=lambda x: len(x[0]), reverse=True):
|
|
|
|
|
if long_name.startswith(long):
|
|
|
|
|
return short + long_name[len(long):]
|
|
|
|
|
return long_name
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# System info extraction
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def get_system_info(oid_data):
|
|
|
|
|
"""Extract system-level information."""
|
|
|
|
|
sys_descr = _get(oid_data, OID_SYS_DESCR)
|
|
|
|
|
sys_uptime = _get(oid_data, OID_SYS_UPTIME)
|
|
|
|
|
sys_name = _get(oid_data, OID_SYS_NAME)
|
|
|
|
|
|
|
|
|
|
os_type = "unknown"
|
|
|
|
|
platform = ""
|
|
|
|
|
|
|
|
|
|
if "IOS-XR" in sys_descr:
|
|
|
|
|
os_type = "IOS-XR"
|
|
|
|
|
# Try to extract version
|
|
|
|
|
ver_match = re.search(r"Version\s+([\d.]+)", sys_descr)
|
|
|
|
|
version = ver_match.group(1) if ver_match else ""
|
|
|
|
|
# Try to extract platform from sysDescr
|
|
|
|
|
plat_match = re.search(r"Cisco\s+(\S+)", sys_descr)
|
|
|
|
|
plat_name = plat_match.group(1) if plat_match else "Cisco"
|
|
|
|
|
platform = f"Cisco {plat_name} IOS-XR {version}".strip()
|
|
|
|
|
elif "IOS Software" in sys_descr or "Cisco IOS" in sys_descr:
|
|
|
|
|
os_type = "IOS-XE"
|
|
|
|
|
# Try to extract version
|
|
|
|
|
ver_match = re.search(r"Version\s+([\d.()A-Za-z]+)", sys_descr)
|
|
|
|
|
version = ver_match.group(1) if ver_match else ""
|
|
|
|
|
# Try to extract platform (e.g., CAT3K, C3850, Catalyst)
|
|
|
|
|
plat_match = re.search(r"(CAT\w+|C\d\w+|Catalyst\s+\S+)", sys_descr)
|
|
|
|
|
if not plat_match:
|
|
|
|
|
plat_match = re.search(r"Cisco\s+(\S+)", sys_descr)
|
|
|
|
|
plat_name = plat_match.group(1) if plat_match else "Cisco"
|
|
|
|
|
platform = f"Cisco {plat_name} IOS-XE {version}".strip()
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"sysName": sys_name,
|
|
|
|
|
"sysDescr": sys_descr,
|
|
|
|
|
"sysUpTime": sys_uptime,
|
|
|
|
|
"platform": platform,
|
|
|
|
|
"osType": os_type,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Interface index builder
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def build_interface_index(oid_data):
|
|
|
|
|
"""Build a dict of ifIndex -> {ifDescr, ifName, ...} from walk data."""
|
|
|
|
|
interfaces = {}
|
|
|
|
|
|
|
|
|
|
# Collect ifDescr entries
|
|
|
|
|
descr_prefix = OID_IF_DESCR + "."
|
|
|
|
|
for oid, value in oid_data.items():
|
|
|
|
|
if oid.startswith(descr_prefix):
|
|
|
|
|
ifindex = oid[len(descr_prefix):]
|
|
|
|
|
if ifindex not in interfaces:
|
|
|
|
|
interfaces[ifindex] = {}
|
|
|
|
|
interfaces[ifindex]["ifDescr"] = value
|
|
|
|
|
|
|
|
|
|
# Collect ifName entries
|
|
|
|
|
name_prefix = OID_IF_NAME + "."
|
|
|
|
|
for oid, value in oid_data.items():
|
|
|
|
|
if oid.startswith(name_prefix):
|
|
|
|
|
ifindex = oid[len(name_prefix):]
|
|
|
|
|
if ifindex not in interfaces:
|
|
|
|
|
interfaces[ifindex] = {}
|
|
|
|
|
interfaces[ifindex]["ifName"] = value
|
|
|
|
|
|
|
|
|
|
return interfaces
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# remPortId matching
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def match_rem_port_id(interfaces, rem_port_id):
|
|
|
|
|
"""Match a remPortId string to an ifIndex.
|
|
|
|
|
|
|
|
|
|
Tries:
|
|
|
|
|
1. Exact match against ifName values
|
|
|
|
|
2. Expand short name and match against ifDescr
|
|
|
|
|
3. Direct match against ifDescr (already long form)
|
|
|
|
|
|
|
|
|
|
Returns ifIndex as string, or None.
|
|
|
|
|
"""
|
|
|
|
|
# 1. Exact match against ifName
|
|
|
|
|
for ifindex, info in interfaces.items():
|
|
|
|
|
if info.get("ifName", "") == rem_port_id:
|
|
|
|
|
_dbg(f"Matched remPortId '{rem_port_id}' via ifName -> ifIndex {ifindex}")
|
|
|
|
|
return ifindex
|
|
|
|
|
|
|
|
|
|
# 2. Expand short name and match ifDescr
|
|
|
|
|
expanded = _expand_short_name(rem_port_id)
|
|
|
|
|
if expanded != rem_port_id:
|
|
|
|
|
for ifindex, info in interfaces.items():
|
|
|
|
|
if info.get("ifDescr", "") == expanded:
|
|
|
|
|
_dbg(f"Matched remPortId '{rem_port_id}' (expanded: '{expanded}') via ifDescr -> ifIndex {ifindex}")
|
|
|
|
|
return ifindex
|
|
|
|
|
|
|
|
|
|
# 3. Direct match against ifDescr
|
|
|
|
|
for ifindex, info in interfaces.items():
|
|
|
|
|
if info.get("ifDescr", "") == rem_port_id:
|
|
|
|
|
_dbg(f"Matched remPortId '{rem_port_id}' via ifDescr direct -> ifIndex {ifindex}")
|
|
|
|
|
return ifindex
|
|
|
|
|
|
2026-03-06 11:06:23 -07:00
|
|
|
# 4. Cross-platform fallback: IOS-XE short name on IOS-XR device or vice versa
|
|
|
|
|
# e.g., remPortId "Te0/0/0/5" expands to "TenGigabitEthernet0/0/0/5" (IOS-XE)
|
|
|
|
|
# but the NCS ifDescr is "TenGigE0/0/0/5" (IOS-XR)
|
|
|
|
|
# Try alternate expansions by extracting the slot/port suffix and matching
|
|
|
|
|
xr_xe_pairs = {
|
|
|
|
|
"TenGigabitEthernet": "TenGigE",
|
|
|
|
|
"TenGigE": "TenGigabitEthernet",
|
|
|
|
|
"GigabitEthernet": "GigE",
|
|
|
|
|
"GigE": "GigabitEthernet",
|
|
|
|
|
"FortyGigabitEthernet": "FortyGigE",
|
|
|
|
|
"FortyGigE": "FortyGigabitEthernet",
|
|
|
|
|
"Port-channel": "Bundle-Ether",
|
|
|
|
|
"Bundle-Ether": "Port-channel",
|
|
|
|
|
}
|
|
|
|
|
if expanded != rem_port_id:
|
|
|
|
|
for xe_name, xr_name in xr_xe_pairs.items():
|
|
|
|
|
if expanded.startswith(xe_name):
|
|
|
|
|
alt = xr_name + expanded[len(xe_name):]
|
|
|
|
|
for ifindex, info in interfaces.items():
|
|
|
|
|
if info.get("ifDescr", "") == alt:
|
|
|
|
|
_dbg(f"Matched remPortId '{rem_port_id}' via cross-platform '{alt}' -> ifIndex {ifindex}")
|
|
|
|
|
return ifindex
|
|
|
|
|
|
2026-03-06 10:48:44 -07:00
|
|
|
_dbg(f"WARNING: Could not match remPortId '{rem_port_id}' to any interface")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Interface facts extraction
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def get_interface_facts(oid_data, ifindex):
|
|
|
|
|
"""Extract all relevant facts for a given ifIndex."""
|
|
|
|
|
idx = str(ifindex)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"ifDescr": _get(oid_data, f"{OID_IF_DESCR}.{idx}"),
|
|
|
|
|
"ifType": _get(oid_data, f"{OID_IF_TYPE}.{idx}"),
|
|
|
|
|
"ifMtu": _get(oid_data, f"{OID_IF_MTU}.{idx}"),
|
|
|
|
|
"ifSpeed": _get(oid_data, f"{OID_IF_SPEED}.{idx}"),
|
|
|
|
|
"ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{idx}"),
|
|
|
|
|
"ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{idx}"),
|
|
|
|
|
"ifInOctets": _get(oid_data, f"{OID_IF_IN_OCTETS}.{idx}"),
|
|
|
|
|
"ifInErrors": _get(oid_data, f"{OID_IF_IN_ERRORS}.{idx}"),
|
|
|
|
|
"ifOutOctets": _get(oid_data, f"{OID_IF_OUT_OCTETS}.{idx}"),
|
|
|
|
|
"ifOutErrors": _get(oid_data, f"{OID_IF_OUT_ERRORS}.{idx}"),
|
|
|
|
|
"ifInDiscards": _get(oid_data, f"{OID_IF_IN_DISCARDS}.{idx}"),
|
|
|
|
|
"ifOutDiscards": _get(oid_data, f"{OID_IF_OUT_DISCARDS}.{idx}"),
|
|
|
|
|
"ifName": _get(oid_data, f"{OID_IF_NAME}.{idx}"),
|
|
|
|
|
"ifHighSpeed": _get(oid_data, f"{OID_IF_HIGH_SPEED}.{idx}"),
|
|
|
|
|
"ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{idx}"),
|
|
|
|
|
"ifHCInOctets": _get(oid_data, f"{OID_IF_HC_IN_OCTETS}.{idx}"),
|
|
|
|
|
"ifHCOutOctets": _get(oid_data, f"{OID_IF_HC_OUT_OCTETS}.{idx}"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Subinterface discovery
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def discover_subinterfaces_stack(oid_data, parent_ifindex):
|
|
|
|
|
"""Discover child interfaces via ifStackTable.
|
|
|
|
|
|
|
|
|
|
In ifStackTable, .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower} = status
|
|
|
|
|
When lower == parent_ifindex, higher is a child/sub-interface layered
|
|
|
|
|
on top of the parent. Skip ifIndex 0 (represents 'not stacked').
|
|
|
|
|
"""
|
|
|
|
|
children = set()
|
|
|
|
|
prefix = OID_IF_STACK_STATUS + "."
|
|
|
|
|
parent_suffix = f".{parent_ifindex}"
|
|
|
|
|
|
|
|
|
|
for oid in oid_data:
|
|
|
|
|
if not oid.startswith(prefix):
|
|
|
|
|
continue
|
|
|
|
|
remainder = oid[len(prefix):]
|
|
|
|
|
parts = remainder.split(".", 1)
|
|
|
|
|
if len(parts) != 2:
|
|
|
|
|
continue
|
|
|
|
|
higher, lower = parts[0], parts[1]
|
|
|
|
|
if lower == str(parent_ifindex) and higher != "0":
|
|
|
|
|
children.add(higher)
|
|
|
|
|
|
|
|
|
|
return children
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def discover_subinterfaces_pattern(oid_data, parent_descr):
|
|
|
|
|
"""Discover subinterfaces by name pattern matching.
|
|
|
|
|
|
|
|
|
|
Look for ifDescr values like '{parent_descr}.{number}'.
|
|
|
|
|
"""
|
|
|
|
|
children = {}
|
|
|
|
|
prefix = OID_IF_DESCR + "."
|
|
|
|
|
pattern = parent_descr + "."
|
|
|
|
|
|
|
|
|
|
for oid, value in oid_data.items():
|
|
|
|
|
if not oid.startswith(prefix):
|
|
|
|
|
continue
|
|
|
|
|
if value.startswith(pattern):
|
|
|
|
|
ifindex = oid[len(prefix):]
|
|
|
|
|
children[ifindex] = value
|
|
|
|
|
|
|
|
|
|
return children
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def discover_subinterfaces(oid_data, parent_ifindex, parent_descr):
|
|
|
|
|
"""Combine stack-based and pattern-based subinterface discovery."""
|
|
|
|
|
# Approach A: ifStackTable
|
|
|
|
|
stack_children = discover_subinterfaces_stack(oid_data, parent_ifindex)
|
|
|
|
|
|
|
|
|
|
# Approach B: pattern matching
|
|
|
|
|
pattern_children = discover_subinterfaces_pattern(oid_data, parent_descr)
|
|
|
|
|
|
|
|
|
|
# Merge: union of both sets of ifIndex values
|
|
|
|
|
all_child_indices = set(stack_children) | set(pattern_children.keys())
|
|
|
|
|
|
|
|
|
|
return all_child_indices
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# VLAN / SVI discovery (IOS-XE)
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def discover_vlans(oid_data, os_type):
|
|
|
|
|
"""Discover VLAN SVIs for IOS-XE devices.
|
|
|
|
|
|
|
|
|
|
On IOS-XE Catalyst switches, scan ifDescr for 'Vlan{N}' entries
|
|
|
|
|
and collect their interface facts.
|
|
|
|
|
"""
|
|
|
|
|
vlans = {}
|
|
|
|
|
|
|
|
|
|
if os_type != "IOS-XE":
|
|
|
|
|
return vlans
|
|
|
|
|
|
|
|
|
|
prefix = OID_IF_DESCR + "."
|
|
|
|
|
vlan_re = re.compile(r"^Vlan(\d+)$")
|
|
|
|
|
|
|
|
|
|
for oid, value in oid_data.items():
|
|
|
|
|
if not oid.startswith(prefix):
|
|
|
|
|
continue
|
|
|
|
|
m = vlan_re.match(value)
|
|
|
|
|
if not m:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
vlan_num = m.group(1)
|
|
|
|
|
ifindex = oid[len(prefix):]
|
|
|
|
|
|
|
|
|
|
vlans[vlan_num] = {
|
|
|
|
|
"ifIndex": ifindex,
|
|
|
|
|
"ifDescr": value,
|
|
|
|
|
"ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"),
|
|
|
|
|
"ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{ifindex}"),
|
|
|
|
|
"ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return vlans
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# BDI / BVI correlation
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def find_bdi_bvi(oid_data, vlan_id):
|
|
|
|
|
"""Find a BDI{vlan_id} or BVI{vlan_id} interface if it exists.
|
|
|
|
|
|
|
|
|
|
Returns (ifIndex, ifDescr, ifOperStatus, ifAlias) or (None, None, None, None).
|
|
|
|
|
"""
|
|
|
|
|
prefix = OID_IF_DESCR + "."
|
|
|
|
|
targets = [f"BDI{vlan_id}", f"BVI{vlan_id}"]
|
|
|
|
|
|
|
|
|
|
for oid, value in oid_data.items():
|
|
|
|
|
if not oid.startswith(prefix):
|
|
|
|
|
continue
|
|
|
|
|
if value in targets:
|
|
|
|
|
ifindex = oid[len(prefix):]
|
|
|
|
|
return (
|
|
|
|
|
ifindex,
|
|
|
|
|
value,
|
|
|
|
|
_get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"),
|
|
|
|
|
_get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return (None, None, None, None)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Optics / Entity Sensor (best-effort)
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def get_optics_info(oid_data, parent_ifindex, parent_descr):
|
|
|
|
|
"""Best-effort extraction of optics data from ENTITY-MIB and
|
|
|
|
|
CISCO-ENTITY-SENSOR-MIB.
|
|
|
|
|
|
|
|
|
|
Strategy:
|
|
|
|
|
1. Walk entPhysicalName (.1.3.6.1.2.1.47.1.1.1.1.7) to find entities
|
|
|
|
|
whose name contains the interface name/description.
|
|
|
|
|
2. For those entity indices, look up sensor values from
|
|
|
|
|
CISCO-ENTITY-SENSOR-MIB (.1.3.6.1.4.1.9.9.91.1.1.1.1.4.{entIdx}).
|
|
|
|
|
3. Use entPhysicalDescr or entSensorType to classify as Tx/Rx/temp.
|
|
|
|
|
|
|
|
|
|
Returns dict with txPower, rxPower, temperature (all may be None).
|
|
|
|
|
"""
|
|
|
|
|
result = {
|
|
|
|
|
"txPower": None,
|
|
|
|
|
"rxPower": None,
|
|
|
|
|
"temperature": None,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# entPhysicalName: .1.3.6.1.2.1.47.1.1.1.1.7.{entIdx}
|
|
|
|
|
ent_name_prefix = OID_ENT_PHYS_PREFIX.rsplit(".", 1)[0] + ".7."
|
|
|
|
|
# Actually: .1.3.6.1.2.1.47.1.1.1.1.7.{idx}
|
|
|
|
|
ent_name_prefix = ".1.3.6.1.2.1.47.1.1.1.1.7."
|
|
|
|
|
|
|
|
|
|
# entPhysicalDescr: .1.3.6.1.2.1.47.1.1.1.1.2.{idx}
|
|
|
|
|
ent_descr_prefix = ".1.3.6.1.2.1.47.1.1.1.1.2."
|
|
|
|
|
|
|
|
|
|
# entSensorValue: .1.3.6.1.4.1.9.9.91.1.1.1.1.4.{idx}
|
|
|
|
|
sensor_value_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.4."
|
|
|
|
|
|
|
|
|
|
# entSensorType: .1.3.6.1.4.1.9.9.91.1.1.1.1.1.{idx}
|
|
|
|
|
sensor_type_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.1."
|
|
|
|
|
|
|
|
|
|
# Find entity indices that match the interface
|
|
|
|
|
short_name = _shorten_name(parent_descr)
|
|
|
|
|
matching_ent_indices = []
|
|
|
|
|
|
|
|
|
|
for oid, value in oid_data.items():
|
|
|
|
|
if not oid.startswith(ent_name_prefix):
|
|
|
|
|
continue
|
|
|
|
|
# Check if entity name references our interface
|
|
|
|
|
if parent_descr in value or short_name in value:
|
|
|
|
|
ent_idx = oid[len(ent_name_prefix):]
|
|
|
|
|
matching_ent_indices.append(ent_idx)
|
|
|
|
|
|
|
|
|
|
if not matching_ent_indices:
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
_dbg(f"Found {len(matching_ent_indices)} entity entries for {parent_descr}")
|
|
|
|
|
|
|
|
|
|
# For matching entities, look up sensor readings
|
|
|
|
|
for ent_idx in matching_ent_indices:
|
|
|
|
|
sensor_val = _get(oid_data, f"{sensor_value_prefix}{ent_idx}")
|
|
|
|
|
if not sensor_val:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Determine sensor type from entSensorType or entPhysicalDescr
|
|
|
|
|
sensor_type = _get(oid_data, f"{sensor_type_prefix}{ent_idx}")
|
|
|
|
|
ent_descr = _get(oid_data, f"{ent_descr_prefix}{ent_idx}").lower()
|
|
|
|
|
|
|
|
|
|
# entSensorType: 8 = celsius, 14 = dBm
|
|
|
|
|
# Also check description text for classification
|
|
|
|
|
if sensor_type == "8" or "temperature" in ent_descr or "temp" in ent_descr:
|
|
|
|
|
result["temperature"] = sensor_val
|
|
|
|
|
elif "transmit" in ent_descr or "tx" in ent_descr:
|
|
|
|
|
result["txPower"] = sensor_val
|
|
|
|
|
elif "receive" in ent_descr or "rx" in ent_descr:
|
|
|
|
|
result["rxPower"] = sensor_val
|
|
|
|
|
elif sensor_type == "14":
|
|
|
|
|
# dBm but unclassified — assign to first empty power slot
|
|
|
|
|
if result["txPower"] is None:
|
|
|
|
|
result["txPower"] = sensor_val
|
|
|
|
|
elif result["rxPower"] is None:
|
|
|
|
|
result["rxPower"] = sensor_val
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Debug helper
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def _dbg(msg):
|
|
|
|
|
"""Print debug/progress info to stderr."""
|
|
|
|
|
print(f"[cisco-parse] {msg}", file=sys.stderr)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# Main builder
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def build_neighbor_output(oid_data, rem_port_id):
|
|
|
|
|
"""Build the structured neighbor output from parsed OID data.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
oid_data: dict from parse_walk_file()
|
|
|
|
|
rem_port_id: interface identifier to focus on (e.g., 'Te1/1/3')
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict matching the output JSON structure
|
|
|
|
|
"""
|
|
|
|
|
# System info
|
|
|
|
|
sys_info = get_system_info(oid_data)
|
|
|
|
|
_dbg(f"System: {sys_info['sysName']} ({sys_info['osType']})")
|
|
|
|
|
|
|
|
|
|
# Build interface index
|
|
|
|
|
interfaces = build_interface_index(oid_data)
|
|
|
|
|
_dbg(f"Found {len(interfaces)} interfaces in walk data")
|
|
|
|
|
|
|
|
|
|
# Match remPortId to ifIndex
|
|
|
|
|
matched_ifindex = match_rem_port_id(interfaces, rem_port_id)
|
|
|
|
|
|
|
|
|
|
# Build queried interface section
|
|
|
|
|
queried_iface = {"remPortId": rem_port_id}
|
|
|
|
|
parent_descr = ""
|
|
|
|
|
|
|
|
|
|
if matched_ifindex is not None:
|
|
|
|
|
facts = get_interface_facts(oid_data, matched_ifindex)
|
|
|
|
|
parent_descr = facts.get("ifDescr", "")
|
|
|
|
|
queried_iface.update({
|
|
|
|
|
"ifIndex": matched_ifindex,
|
|
|
|
|
"ifDescr": facts["ifDescr"],
|
|
|
|
|
"ifName": facts["ifName"],
|
|
|
|
|
"ifAlias": facts["ifAlias"],
|
|
|
|
|
"ifType": facts["ifType"],
|
|
|
|
|
"ifAdminStatus": facts["ifAdminStatus"],
|
|
|
|
|
"ifOperStatus": facts["ifOperStatus"],
|
|
|
|
|
"ifSpeed": facts["ifSpeed"],
|
|
|
|
|
"ifHighSpeed": facts["ifHighSpeed"],
|
|
|
|
|
"ifMtu": facts["ifMtu"],
|
|
|
|
|
"ifHCInOctets": facts["ifHCInOctets"],
|
|
|
|
|
"ifHCOutOctets": facts["ifHCOutOctets"],
|
|
|
|
|
"ifInErrors": facts["ifInErrors"],
|
|
|
|
|
"ifOutErrors": facts["ifOutErrors"],
|
|
|
|
|
"ifInDiscards": facts["ifInDiscards"],
|
|
|
|
|
"ifOutDiscards": facts["ifOutDiscards"],
|
|
|
|
|
})
|
|
|
|
|
_dbg(f"Queried interface: {parent_descr} (ifIndex {matched_ifindex})")
|
|
|
|
|
else:
|
|
|
|
|
# Fill with empty values so the JSON schema is consistent
|
|
|
|
|
for key in ("ifIndex", "ifDescr", "ifName", "ifAlias", "ifType",
|
|
|
|
|
"ifAdminStatus", "ifOperStatus", "ifSpeed", "ifHighSpeed",
|
|
|
|
|
"ifMtu", "ifHCInOctets", "ifHCOutOctets", "ifInErrors",
|
|
|
|
|
"ifOutErrors", "ifInDiscards", "ifOutDiscards"):
|
|
|
|
|
queried_iface[key] = ""
|
|
|
|
|
_dbg("WARNING: No interface matched, output will have empty queried_interface")
|
|
|
|
|
|
|
|
|
|
# Discover subinterfaces
|
|
|
|
|
subinterfaces = {}
|
|
|
|
|
if matched_ifindex and parent_descr:
|
|
|
|
|
child_indices = discover_subinterfaces(oid_data, matched_ifindex, parent_descr)
|
|
|
|
|
_dbg(f"Found {len(child_indices)} subinterfaces")
|
|
|
|
|
|
|
|
|
|
for child_idx in sorted(child_indices, key=lambda x: int(x) if x.isdigit() else 0):
|
|
|
|
|
child_facts = get_interface_facts(oid_data, child_idx)
|
|
|
|
|
child_descr = child_facts.get("ifDescr", "")
|
|
|
|
|
|
|
|
|
|
# Extract VLAN ID from .NNNN suffix
|
|
|
|
|
vlan_id = ""
|
|
|
|
|
vlan_match = re.search(r"\.(\d+)$", child_descr)
|
|
|
|
|
if vlan_match:
|
|
|
|
|
vlan_id = vlan_match.group(1)
|
|
|
|
|
|
|
|
|
|
# BDI/BVI correlation
|
|
|
|
|
bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = (None, None, None, None)
|
|
|
|
|
if vlan_id:
|
|
|
|
|
bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = find_bdi_bvi(oid_data, vlan_id)
|
|
|
|
|
|
|
|
|
|
subinterfaces[child_idx] = {
|
|
|
|
|
"ifDescr": child_descr,
|
|
|
|
|
"ifName": child_facts.get("ifName", ""),
|
|
|
|
|
"ifAlias": child_facts.get("ifAlias", ""),
|
|
|
|
|
"ifAdminStatus": child_facts.get("ifAdminStatus", ""),
|
|
|
|
|
"ifOperStatus": child_facts.get("ifOperStatus", ""),
|
|
|
|
|
"vlanId": vlan_id,
|
|
|
|
|
"bvi_ifIndex": bvi_ifindex,
|
|
|
|
|
"bvi_ifDescr": bvi_descr,
|
|
|
|
|
"bvi_ifOperStatus": bvi_oper,
|
|
|
|
|
"bvi_ifAlias": bvi_alias,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Discover VLANs (IOS-XE)
|
|
|
|
|
vlans = discover_vlans(oid_data, sys_info["osType"])
|
|
|
|
|
_dbg(f"Found {len(vlans)} VLAN SVIs")
|
|
|
|
|
|
|
|
|
|
# Optics (best-effort)
|
|
|
|
|
optics = {"txPower": None, "rxPower": None, "temperature": None}
|
|
|
|
|
if matched_ifindex and parent_descr:
|
|
|
|
|
optics = get_optics_info(oid_data, matched_ifindex, parent_descr)
|
|
|
|
|
|
|
|
|
|
# Assemble output
|
|
|
|
|
output = {
|
|
|
|
|
"neighbor_system": sys_info,
|
|
|
|
|
"queried_interface": queried_iface,
|
|
|
|
|
"subinterfaces": subinterfaces,
|
|
|
|
|
"vlans": vlans,
|
|
|
|
|
"optics": optics,
|
|
|
|
|
"_meta": {
|
|
|
|
|
"polled_at": datetime.now(timezone.utc).isoformat(),
|
|
|
|
|
"target_ip": "",
|
|
|
|
|
"walk_lines": 0,
|
|
|
|
|
"elapsed_sec": 0.0,
|
|
|
|
|
"remPortId": rem_port_id,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
# CLI entry point
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
if len(sys.argv) < 3:
|
|
|
|
|
print("Usage: cisco-parse.py <walk_file> <remPortId> [output.json]")
|
|
|
|
|
print()
|
|
|
|
|
print("Arguments:")
|
|
|
|
|
print(" walk_file Raw snmpbulkwalk -On -OQ output file")
|
|
|
|
|
print(" remPortId Interface to focus on (e.g., Te1/1/3, Gi1/0/39)")
|
|
|
|
|
print(" output.json Optional output path (default: {stem}_neighbor_monitoring.json)")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
walk_file = Path(sys.argv[1])
|
|
|
|
|
rem_port_id = sys.argv[2]
|
|
|
|
|
output_path = (
|
|
|
|
|
Path(sys.argv[3])
|
|
|
|
|
if len(sys.argv) > 3
|
|
|
|
|
else walk_file.with_name(walk_file.stem + "_neighbor_monitoring.json")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not walk_file.exists():
|
|
|
|
|
print(f"Error: walk file not found: {walk_file}", file=sys.stderr)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
_dbg(f"Parsing {walk_file}")
|
|
|
|
|
_dbg(f"Looking for remPortId: {rem_port_id}")
|
|
|
|
|
|
|
|
|
|
t_start = time.time()
|
|
|
|
|
oid_data = parse_walk_file(walk_file)
|
|
|
|
|
_dbg(f"Parsed {len(oid_data)} OID entries")
|
|
|
|
|
|
|
|
|
|
result = build_neighbor_output(oid_data, rem_port_id)
|
|
|
|
|
result["_meta"]["walk_lines"] = len(oid_data)
|
|
|
|
|
result["_meta"]["elapsed_sec"] = round(time.time() - t_start, 2)
|
|
|
|
|
|
|
|
|
|
# Try to extract target IP from filename pattern: {IP}_{timestamp}_..._walk.txt
|
|
|
|
|
# e.g., 172-16-50-4_2026-03-01_10-00-00_neighbor_walk.txt
|
|
|
|
|
name = walk_file.stem
|
|
|
|
|
ip_match = re.match(r"([\d-]+)_", name)
|
|
|
|
|
if ip_match:
|
|
|
|
|
result["_meta"]["target_ip"] = ip_match.group(1).replace("-", ".")
|
|
|
|
|
|
|
|
|
|
output_path.write_text(json.dumps(result, indent=2))
|
|
|
|
|
print(f"Wrote {output_path}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|