nid-snmp/cisco_parse.py
sam c285810c68 Two-phase focused neighbor walk and fix status/optics bugs
- Restructure neighbor walk into Phase 1 (discovery: ifDescr + ifName +
  ifStackTable) and Phase 2 (targeted snmpget for matched interfaces only).
  Reduces NCS 5500 walk from ~150k OIDs to ~20k discovery + ~600 targeted.
- Rename cisco-parse.py to cisco_parse.py for Python import compatibility.
- Add parse_walk_text() for in-process parsing without file I/O.
- Fix interface status showing DOWN/ADMIN DOWN: use isUp() instead of
  hardcoded === '1' checks, add -Oe flag to snmpget for numeric enums.
- Fix optics showing raw sensor values: apply entSensorPrecision scaling
  (e.g., -95122 with precision 4 → -9.5122 dBm).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:34 -07:00

800 lines
33 KiB
Python

#!/usr/bin/env python3
"""
Cisco SNMP Walk Parser
Parses raw snmpbulkwalk -On -OQ output from a Cisco device and produces
a structured JSON file with interface details focused on a specific
remPortId (LLDP neighbor interface).
Usage:
python3 cisco-parse.py <walk_file.txt> <remPortId> [output.json]
Examples:
python3 cisco-parse.py walks/172-16-50-4_walk.txt Te1/1/3
python3 cisco-parse.py walks/172-16-50-4_walk.txt Gi1/0/39 output.json
"""
import json
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
# ────────────────────────────────────────────────────────────────────────
# Cisco short-name <-> long-name mapping
# ────────────────────────────────────────────────────────────────────────
CISCO_SHORT_TO_LONG = {
# IOS-XE naming
"Te": "TenGigabitEthernet",
"Gi": "GigabitEthernet",
"Fa": "FastEthernet",
"Fo": "FortyGigabitEthernet",
"Tw": "TwentyFiveGigE",
"Twe": "TwentyFiveGigE",
"Eth": "Ethernet",
"Po": "Port-channel",
"Vl": "Vlan",
# IOS-XR naming (NCS 5500, ASR 9000, etc.)
"TenGigE": "TenGigE",
"HundredGigE": "HundredGigE",
"TwentyFiveGigE": "TwentyFiveGigE",
"FortyGigE": "FortyGigE",
"GigE": "GigE", # rare but exists on some XR platforms
"Bundle-Ether": "Bundle-Ether",
"BE": "Bundle-Ether",
# Common to both
"Hu": "HundredGigE",
"BDI": "BDI",
"BVI": "BVI",
"Lo": "Loopback",
}
# Reverse map: long -> short (built from the canonical mapping)
CISCO_LONG_TO_SHORT = {}
for _short, _long in CISCO_SHORT_TO_LONG.items():
# If multiple shorts map to the same long, prefer the shorter abbreviation
if _long not in CISCO_LONG_TO_SHORT or len(_short) < len(CISCO_LONG_TO_SHORT[_long]):
CISCO_LONG_TO_SHORT[_long] = _short
# ────────────────────────────────────────────────────────────────────────
# OID constants
# ────────────────────────────────────────────────────────────────────────
OID_SYS_DESCR = ".1.3.6.1.2.1.1.1.0"
OID_SYS_UPTIME = ".1.3.6.1.2.1.1.3.0"
OID_SYS_NAME = ".1.3.6.1.2.1.1.5.0"
# IF-MIB base prefixes (append .{ifIndex})
OID_IF_DESCR = ".1.3.6.1.2.1.2.2.1.2"
OID_IF_TYPE = ".1.3.6.1.2.1.2.2.1.3"
OID_IF_MTU = ".1.3.6.1.2.1.2.2.1.4"
OID_IF_SPEED = ".1.3.6.1.2.1.2.2.1.5"
OID_IF_ADMIN_STATUS = ".1.3.6.1.2.1.2.2.1.7"
OID_IF_OPER_STATUS = ".1.3.6.1.2.1.2.2.1.8"
OID_IF_IN_OCTETS = ".1.3.6.1.2.1.2.2.1.10"
OID_IF_IN_DISCARDS = ".1.3.6.1.2.1.2.2.1.13"
OID_IF_IN_ERRORS = ".1.3.6.1.2.1.2.2.1.14"
OID_IF_OUT_OCTETS = ".1.3.6.1.2.1.2.2.1.16"
OID_IF_OUT_DISCARDS = ".1.3.6.1.2.1.2.2.1.19"
OID_IF_OUT_ERRORS = ".1.3.6.1.2.1.2.2.1.20"
# IF-MIB ifXTable
OID_IF_NAME = ".1.3.6.1.2.1.31.1.1.1.1"
OID_IF_HC_IN_OCTETS = ".1.3.6.1.2.1.31.1.1.1.6"
OID_IF_HC_OUT_OCTETS = ".1.3.6.1.2.1.31.1.1.1.10"
OID_IF_HIGH_SPEED = ".1.3.6.1.2.1.31.1.1.1.15"
OID_IF_ALIAS = ".1.3.6.1.2.1.31.1.1.1.18"
# ifStackTable: .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower}
OID_IF_STACK_STATUS = ".1.3.6.1.2.1.31.1.2.1.3"
# ENTITY-MIB
OID_ENT_PHYS_PREFIX = ".1.3.6.1.2.1.47.1.1.1.1"
# CISCO-ENTITY-SENSOR-MIB
OID_CISCO_SENSOR_PREFIX = ".1.3.6.1.4.1.9.9.91.1.1.1.1"
# ────────────────────────────────────────────────────────────────────────
# Walk file parser
# ────────────────────────────────────────────────────────────────────────
def _parse_lines(lines):
"""Parse snmpbulkwalk/snmpget -On -OQ output lines into {oid: value} dict.
Handles multi-line quoted values (e.g., Cisco sysDescr).
String values have surrounding quotes stripped.
"""
oid_data = {}
pending_oid = None
pending_val = None
for raw_line in lines:
line = raw_line.rstrip("\n\r")
if pending_oid is not None:
pending_val += " " + line.strip()
if '"' in line:
val = pending_val.strip()
if val.startswith('"'):
val = val[1:]
if val.endswith('"'):
val = val[:-1]
oid_data[pending_oid] = val
pending_oid = None
pending_val = None
continue
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(" = ", 1)
if len(parts) != 2:
continue
oid = parts[0].strip()
value = parts[1].strip()
if value.startswith('"') and not value.endswith('"'):
pending_oid = oid
pending_val = value
continue
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
value = value[1:-1]
oid_data[oid] = value
if pending_oid is not None:
val = pending_val.strip().strip('"')
oid_data[pending_oid] = val
return oid_data
def parse_walk_file(walk_file):
"""Parse an snmpbulkwalk -On -OQ output file into {oid: value} dict."""
walk_path = Path(walk_file)
with walk_path.open("r", errors="replace") as fh:
return _parse_lines(fh)
def parse_walk_text(text):
"""Parse snmpbulkwalk/snmpget -On -OQ output from a string.
Useful for in-process parsing without writing to a file first.
"""
return _parse_lines(text.splitlines())
# ────────────────────────────────────────────────────────────────────────
# Helpers
# ────────────────────────────────────────────────────────────────────────
def _get(oid_data, oid, default=""):
"""Safely retrieve a value from oid_data."""
return oid_data.get(oid, default)
def _get_subtree(oid_data, prefix):
"""Return all entries whose OID starts with prefix (with trailing dot)."""
dotted = prefix if prefix.endswith(".") else prefix + "."
return {k: v for k, v in oid_data.items() if k.startswith(dotted) or k == prefix}
def _expand_short_name(short_name):
"""Expand a Cisco short interface name to full form.
'Te1/1/3' -> 'TenGigabitEthernet1/1/3'
'Gi1/0/39' -> 'GigabitEthernet1/0/39'
"""
# Try each prefix, longest first to avoid partial matches (e.g., Twe before Tw)
for short in sorted(CISCO_SHORT_TO_LONG.keys(), key=len, reverse=True):
if short_name.startswith(short):
remainder = short_name[len(short):]
# Verify the remainder starts with a digit or slash (real interface numbering)
if remainder and (remainder[0].isdigit() or remainder[0] == "/"):
return CISCO_SHORT_TO_LONG[short] + remainder
return short_name
def _shorten_name(long_name):
"""Shorten a Cisco full interface name to abbreviated form.
'TenGigabitEthernet1/1/3' -> 'Te1/1/3'
"""
for long, short in sorted(CISCO_LONG_TO_SHORT.items(), key=lambda x: len(x[0]), reverse=True):
if long_name.startswith(long):
return short + long_name[len(long):]
return long_name
# ────────────────────────────────────────────────────────────────────────
# System info extraction
# ────────────────────────────────────────────────────────────────────────
def get_system_info(oid_data):
"""Extract system-level information."""
sys_descr = _get(oid_data, OID_SYS_DESCR)
sys_uptime = _get(oid_data, OID_SYS_UPTIME)
sys_name = _get(oid_data, OID_SYS_NAME)
os_type = "unknown"
platform = ""
if "IOS-XR" in sys_descr:
os_type = "IOS-XR"
# Try to extract version
ver_match = re.search(r"Version\s+([\d.]+)", sys_descr)
version = ver_match.group(1) if ver_match else ""
# Try to extract platform from sysDescr
plat_match = re.search(r"Cisco\s+(\S+)", sys_descr)
plat_name = plat_match.group(1) if plat_match else "Cisco"
platform = f"Cisco {plat_name} IOS-XR {version}".strip()
elif "IOS Software" in sys_descr or "Cisco IOS" in sys_descr:
os_type = "IOS-XE"
# Try to extract version
ver_match = re.search(r"Version\s+([\d.()A-Za-z]+)", sys_descr)
version = ver_match.group(1) if ver_match else ""
# Try to extract platform (e.g., CAT3K, C3850, Catalyst)
plat_match = re.search(r"(CAT\w+|C\d\w+|Catalyst\s+\S+)", sys_descr)
if not plat_match:
plat_match = re.search(r"Cisco\s+(\S+)", sys_descr)
plat_name = plat_match.group(1) if plat_match else "Cisco"
platform = f"Cisco {plat_name} IOS-XE {version}".strip()
return {
"sysName": sys_name,
"sysDescr": sys_descr,
"sysUpTime": sys_uptime,
"platform": platform,
"osType": os_type,
}
# ────────────────────────────────────────────────────────────────────────
# Interface index builder
# ────────────────────────────────────────────────────────────────────────
def build_interface_index(oid_data):
"""Build a dict of ifIndex -> {ifDescr, ifName, ...} from walk data."""
interfaces = {}
# Collect ifDescr entries
descr_prefix = OID_IF_DESCR + "."
for oid, value in oid_data.items():
if oid.startswith(descr_prefix):
ifindex = oid[len(descr_prefix):]
if ifindex not in interfaces:
interfaces[ifindex] = {}
interfaces[ifindex]["ifDescr"] = value
# Collect ifName entries
name_prefix = OID_IF_NAME + "."
for oid, value in oid_data.items():
if oid.startswith(name_prefix):
ifindex = oid[len(name_prefix):]
if ifindex not in interfaces:
interfaces[ifindex] = {}
interfaces[ifindex]["ifName"] = value
return interfaces
# ────────────────────────────────────────────────────────────────────────
# remPortId matching
# ────────────────────────────────────────────────────────────────────────
def match_rem_port_id(interfaces, rem_port_id):
"""Match a remPortId string to an ifIndex.
Tries:
1. Exact match against ifName values
2. Expand short name and match against ifDescr
3. Direct match against ifDescr (already long form)
Returns ifIndex as string, or None.
"""
# 1. Exact match against ifName
for ifindex, info in interfaces.items():
if info.get("ifName", "") == rem_port_id:
_dbg(f"Matched remPortId '{rem_port_id}' via ifName -> ifIndex {ifindex}")
return ifindex
# 2. Expand short name and match ifDescr
expanded = _expand_short_name(rem_port_id)
if expanded != rem_port_id:
for ifindex, info in interfaces.items():
if info.get("ifDescr", "") == expanded:
_dbg(f"Matched remPortId '{rem_port_id}' (expanded: '{expanded}') via ifDescr -> ifIndex {ifindex}")
return ifindex
# 3. Direct match against ifDescr
for ifindex, info in interfaces.items():
if info.get("ifDescr", "") == rem_port_id:
_dbg(f"Matched remPortId '{rem_port_id}' via ifDescr direct -> ifIndex {ifindex}")
return ifindex
# 4. Cross-platform fallback: IOS-XE short name on IOS-XR device or vice versa
# e.g., remPortId "Te0/0/0/5" expands to "TenGigabitEthernet0/0/0/5" (IOS-XE)
# but the NCS ifDescr is "TenGigE0/0/0/5" (IOS-XR)
# Try alternate expansions by extracting the slot/port suffix and matching
xr_xe_pairs = {
"TenGigabitEthernet": "TenGigE",
"TenGigE": "TenGigabitEthernet",
"GigabitEthernet": "GigE",
"GigE": "GigabitEthernet",
"FortyGigabitEthernet": "FortyGigE",
"FortyGigE": "FortyGigabitEthernet",
"Port-channel": "Bundle-Ether",
"Bundle-Ether": "Port-channel",
}
if expanded != rem_port_id:
for xe_name, xr_name in xr_xe_pairs.items():
if expanded.startswith(xe_name):
alt = xr_name + expanded[len(xe_name):]
for ifindex, info in interfaces.items():
if info.get("ifDescr", "") == alt:
_dbg(f"Matched remPortId '{rem_port_id}' via cross-platform '{alt}' -> ifIndex {ifindex}")
return ifindex
_dbg(f"WARNING: Could not match remPortId '{rem_port_id}' to any interface")
return None
# ────────────────────────────────────────────────────────────────────────
# Interface facts extraction
# ────────────────────────────────────────────────────────────────────────
def get_interface_facts(oid_data, ifindex):
"""Extract all relevant facts for a given ifIndex."""
idx = str(ifindex)
return {
"ifDescr": _get(oid_data, f"{OID_IF_DESCR}.{idx}"),
"ifType": _get(oid_data, f"{OID_IF_TYPE}.{idx}"),
"ifMtu": _get(oid_data, f"{OID_IF_MTU}.{idx}"),
"ifSpeed": _get(oid_data, f"{OID_IF_SPEED}.{idx}"),
"ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{idx}"),
"ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{idx}"),
"ifInOctets": _get(oid_data, f"{OID_IF_IN_OCTETS}.{idx}"),
"ifInErrors": _get(oid_data, f"{OID_IF_IN_ERRORS}.{idx}"),
"ifOutOctets": _get(oid_data, f"{OID_IF_OUT_OCTETS}.{idx}"),
"ifOutErrors": _get(oid_data, f"{OID_IF_OUT_ERRORS}.{idx}"),
"ifInDiscards": _get(oid_data, f"{OID_IF_IN_DISCARDS}.{idx}"),
"ifOutDiscards": _get(oid_data, f"{OID_IF_OUT_DISCARDS}.{idx}"),
"ifName": _get(oid_data, f"{OID_IF_NAME}.{idx}"),
"ifHighSpeed": _get(oid_data, f"{OID_IF_HIGH_SPEED}.{idx}"),
"ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{idx}"),
"ifHCInOctets": _get(oid_data, f"{OID_IF_HC_IN_OCTETS}.{idx}"),
"ifHCOutOctets": _get(oid_data, f"{OID_IF_HC_OUT_OCTETS}.{idx}"),
}
# ────────────────────────────────────────────────────────────────────────
# Subinterface discovery
# ────────────────────────────────────────────────────────────────────────
def discover_subinterfaces_stack(oid_data, parent_ifindex):
"""Discover child interfaces via ifStackTable.
In ifStackTable, .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower} = status
When lower == parent_ifindex, higher is a child/sub-interface layered
on top of the parent. Skip ifIndex 0 (represents 'not stacked').
"""
children = set()
prefix = OID_IF_STACK_STATUS + "."
parent_suffix = f".{parent_ifindex}"
for oid in oid_data:
if not oid.startswith(prefix):
continue
remainder = oid[len(prefix):]
parts = remainder.split(".", 1)
if len(parts) != 2:
continue
higher, lower = parts[0], parts[1]
if lower == str(parent_ifindex) and higher != "0":
children.add(higher)
return children
def discover_subinterfaces_pattern(oid_data, parent_descr):
"""Discover subinterfaces by name pattern matching.
Look for ifDescr values like '{parent_descr}.{number}'.
"""
children = {}
prefix = OID_IF_DESCR + "."
pattern = parent_descr + "."
for oid, value in oid_data.items():
if not oid.startswith(prefix):
continue
if value.startswith(pattern):
ifindex = oid[len(prefix):]
children[ifindex] = value
return children
def discover_subinterfaces(oid_data, parent_ifindex, parent_descr):
"""Combine stack-based and pattern-based subinterface discovery."""
# Approach A: ifStackTable
stack_children = discover_subinterfaces_stack(oid_data, parent_ifindex)
# Approach B: pattern matching
pattern_children = discover_subinterfaces_pattern(oid_data, parent_descr)
# Merge: union of both sets of ifIndex values
all_child_indices = set(stack_children) | set(pattern_children.keys())
return all_child_indices
# ────────────────────────────────────────────────────────────────────────
# VLAN / SVI discovery (IOS-XE)
# ────────────────────────────────────────────────────────────────────────
def discover_vlans(oid_data, os_type):
"""Discover VLAN SVIs for IOS-XE devices.
On IOS-XE Catalyst switches, scan ifDescr for 'Vlan{N}' entries
and collect their interface facts.
"""
vlans = {}
if os_type != "IOS-XE":
return vlans
prefix = OID_IF_DESCR + "."
vlan_re = re.compile(r"^Vlan(\d+)$")
for oid, value in oid_data.items():
if not oid.startswith(prefix):
continue
m = vlan_re.match(value)
if not m:
continue
vlan_num = m.group(1)
ifindex = oid[len(prefix):]
vlans[vlan_num] = {
"ifIndex": ifindex,
"ifDescr": value,
"ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"),
"ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{ifindex}"),
"ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"),
}
return vlans
# ────────────────────────────────────────────────────────────────────────
# BDI / BVI correlation
# ────────────────────────────────────────────────────────────────────────
def find_bdi_bvi(oid_data, vlan_id):
"""Find a BDI{vlan_id} or BVI{vlan_id} interface if it exists.
Returns (ifIndex, ifDescr, ifOperStatus, ifAlias) or (None, None, None, None).
"""
prefix = OID_IF_DESCR + "."
targets = [f"BDI{vlan_id}", f"BVI{vlan_id}"]
for oid, value in oid_data.items():
if not oid.startswith(prefix):
continue
if value in targets:
ifindex = oid[len(prefix):]
return (
ifindex,
value,
_get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"),
_get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"),
)
return (None, None, None, None)
# ────────────────────────────────────────────────────────────────────────
# Optics / Entity Sensor (best-effort)
# ────────────────────────────────────────────────────────────────────────
def get_optics_info(oid_data, parent_ifindex, parent_descr):
"""Best-effort extraction of optics data from ENTITY-MIB and
CISCO-ENTITY-SENSOR-MIB.
Strategy:
1. Walk entPhysicalName (.1.3.6.1.2.1.47.1.1.1.1.7) to find entities
whose name contains the interface name/description.
2. For those entity indices, look up sensor values from
CISCO-ENTITY-SENSOR-MIB (.1.3.6.1.4.1.9.9.91.1.1.1.1.4.{entIdx}).
3. Use entPhysicalDescr or entSensorType to classify as Tx/Rx/temp.
Returns dict with txPower, rxPower, temperature (all may be None).
"""
result = {
"txPower": None,
"rxPower": None,
"temperature": None,
}
# entPhysicalName: .1.3.6.1.2.1.47.1.1.1.1.7.{entIdx}
ent_name_prefix = OID_ENT_PHYS_PREFIX.rsplit(".", 1)[0] + ".7."
# Actually: .1.3.6.1.2.1.47.1.1.1.1.7.{idx}
ent_name_prefix = ".1.3.6.1.2.1.47.1.1.1.1.7."
# entPhysicalDescr: .1.3.6.1.2.1.47.1.1.1.1.2.{idx}
ent_descr_prefix = ".1.3.6.1.2.1.47.1.1.1.1.2."
# entSensorValue: .1.3.6.1.4.1.9.9.91.1.1.1.1.4.{idx}
sensor_value_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.4."
# entSensorType: .1.3.6.1.4.1.9.9.91.1.1.1.1.1.{idx}
sensor_type_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.1."
# entSensorPrecision: .1.3.6.1.4.1.9.9.91.1.1.1.1.3.{idx}
# Number of decimal places to apply to entSensorValue
sensor_precision_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.3."
# entSensorScale: .1.3.6.1.4.1.9.9.91.1.1.1.1.2.{idx}
# Scale factor (1=yocto..9=units..17=exa) — 9 means no scaling
sensor_scale_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.2."
# Find entity indices that match the interface
short_name = _shorten_name(parent_descr)
matching_ent_indices = []
for oid, value in oid_data.items():
if not oid.startswith(ent_name_prefix):
continue
# Check if entity name references our interface
if parent_descr in value or short_name in value:
ent_idx = oid[len(ent_name_prefix):]
matching_ent_indices.append(ent_idx)
if not matching_ent_indices:
return result
_dbg(f"Found {len(matching_ent_indices)} entity entries for {parent_descr}")
def _scale_sensor_value(raw_val, ent_idx):
"""Apply entSensorPrecision to scale a raw sensor value."""
try:
val = float(raw_val)
except (ValueError, TypeError):
return raw_val
precision = _get(oid_data, f"{sensor_precision_prefix}{ent_idx}")
try:
prec = int(precision)
except (ValueError, TypeError):
prec = 0
if prec > 0:
val = val / (10 ** prec)
# Round to avoid floating point noise
return str(round(val, prec if prec > 0 else 1))
# For matching entities, look up sensor readings
for ent_idx in matching_ent_indices:
sensor_val = _get(oid_data, f"{sensor_value_prefix}{ent_idx}")
if not sensor_val:
continue
# Determine sensor type from entSensorType or entPhysicalDescr
sensor_type = _get(oid_data, f"{sensor_type_prefix}{ent_idx}")
ent_descr = _get(oid_data, f"{ent_descr_prefix}{ent_idx}").lower()
# Scale the raw value using entSensorPrecision
scaled = _scale_sensor_value(sensor_val, ent_idx)
# entSensorType: 8 = celsius, 14 = dBm
# Also check description text for classification
if sensor_type == "8" or "temperature" in ent_descr or "temp" in ent_descr:
result["temperature"] = scaled
elif "transmit" in ent_descr or "tx" in ent_descr:
result["txPower"] = scaled
elif "receive" in ent_descr or "rx" in ent_descr:
result["rxPower"] = scaled
elif sensor_type == "14":
# dBm but unclassified — assign to first empty power slot
if result["txPower"] is None:
result["txPower"] = scaled
elif result["rxPower"] is None:
result["rxPower"] = scaled
return result
# ────────────────────────────────────────────────────────────────────────
# Debug helper
# ────────────────────────────────────────────────────────────────────────
def _dbg(msg):
"""Print debug/progress info to stderr."""
print(f"[cisco-parse] {msg}", file=sys.stderr)
# ────────────────────────────────────────────────────────────────────────
# Main builder
# ────────────────────────────────────────────────────────────────────────
def build_neighbor_output(oid_data, rem_port_id):
"""Build the structured neighbor output from parsed OID data.
Args:
oid_data: dict from parse_walk_file()
rem_port_id: interface identifier to focus on (e.g., 'Te1/1/3')
Returns:
dict matching the output JSON structure
"""
# System info
sys_info = get_system_info(oid_data)
_dbg(f"System: {sys_info['sysName']} ({sys_info['osType']})")
# Build interface index
interfaces = build_interface_index(oid_data)
_dbg(f"Found {len(interfaces)} interfaces in walk data")
# Match remPortId to ifIndex
matched_ifindex = match_rem_port_id(interfaces, rem_port_id)
# Build queried interface section
queried_iface = {"remPortId": rem_port_id}
parent_descr = ""
if matched_ifindex is not None:
facts = get_interface_facts(oid_data, matched_ifindex)
parent_descr = facts.get("ifDescr", "")
queried_iface.update({
"ifIndex": matched_ifindex,
"ifDescr": facts["ifDescr"],
"ifName": facts["ifName"],
"ifAlias": facts["ifAlias"],
"ifType": facts["ifType"],
"ifAdminStatus": facts["ifAdminStatus"],
"ifOperStatus": facts["ifOperStatus"],
"ifSpeed": facts["ifSpeed"],
"ifHighSpeed": facts["ifHighSpeed"],
"ifMtu": facts["ifMtu"],
"ifHCInOctets": facts["ifHCInOctets"],
"ifHCOutOctets": facts["ifHCOutOctets"],
"ifInErrors": facts["ifInErrors"],
"ifOutErrors": facts["ifOutErrors"],
"ifInDiscards": facts["ifInDiscards"],
"ifOutDiscards": facts["ifOutDiscards"],
})
_dbg(f"Queried interface: {parent_descr} (ifIndex {matched_ifindex})")
else:
# Fill with empty values so the JSON schema is consistent
for key in ("ifIndex", "ifDescr", "ifName", "ifAlias", "ifType",
"ifAdminStatus", "ifOperStatus", "ifSpeed", "ifHighSpeed",
"ifMtu", "ifHCInOctets", "ifHCOutOctets", "ifInErrors",
"ifOutErrors", "ifInDiscards", "ifOutDiscards"):
queried_iface[key] = ""
_dbg("WARNING: No interface matched, output will have empty queried_interface")
# Discover subinterfaces
subinterfaces = {}
if matched_ifindex and parent_descr:
child_indices = discover_subinterfaces(oid_data, matched_ifindex, parent_descr)
_dbg(f"Found {len(child_indices)} subinterfaces")
for child_idx in sorted(child_indices, key=lambda x: int(x) if x.isdigit() else 0):
child_facts = get_interface_facts(oid_data, child_idx)
child_descr = child_facts.get("ifDescr", "")
# Extract VLAN ID from .NNNN suffix
vlan_id = ""
vlan_match = re.search(r"\.(\d+)$", child_descr)
if vlan_match:
vlan_id = vlan_match.group(1)
# BDI/BVI correlation
bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = (None, None, None, None)
if vlan_id:
bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = find_bdi_bvi(oid_data, vlan_id)
subinterfaces[child_idx] = {
"ifDescr": child_descr,
"ifName": child_facts.get("ifName", ""),
"ifAlias": child_facts.get("ifAlias", ""),
"ifAdminStatus": child_facts.get("ifAdminStatus", ""),
"ifOperStatus": child_facts.get("ifOperStatus", ""),
"vlanId": vlan_id,
"bvi_ifIndex": bvi_ifindex,
"bvi_ifDescr": bvi_descr,
"bvi_ifOperStatus": bvi_oper,
"bvi_ifAlias": bvi_alias,
}
# Discover VLANs (IOS-XE)
vlans = discover_vlans(oid_data, sys_info["osType"])
_dbg(f"Found {len(vlans)} VLAN SVIs")
# Optics (best-effort)
optics = {"txPower": None, "rxPower": None, "temperature": None}
if matched_ifindex and parent_descr:
optics = get_optics_info(oid_data, matched_ifindex, parent_descr)
# Assemble output
output = {
"neighbor_system": sys_info,
"queried_interface": queried_iface,
"subinterfaces": subinterfaces,
"vlans": vlans,
"optics": optics,
"_meta": {
"polled_at": datetime.now(timezone.utc).isoformat(),
"target_ip": "",
"walk_lines": 0,
"elapsed_sec": 0.0,
"remPortId": rem_port_id,
},
}
return output
# ────────────────────────────────────────────────────────────────────────
# CLI entry point
# ────────────────────────────────────────────────────────────────────────
def main():
if len(sys.argv) < 3:
print("Usage: cisco-parse.py <walk_file> <remPortId> [output.json]")
print()
print("Arguments:")
print(" walk_file Raw snmpbulkwalk -On -OQ output file")
print(" remPortId Interface to focus on (e.g., Te1/1/3, Gi1/0/39)")
print(" output.json Optional output path (default: {stem}_neighbor_monitoring.json)")
sys.exit(1)
walk_file = Path(sys.argv[1])
rem_port_id = sys.argv[2]
output_path = (
Path(sys.argv[3])
if len(sys.argv) > 3
else walk_file.with_name(walk_file.stem + "_neighbor_monitoring.json")
)
if not walk_file.exists():
print(f"Error: walk file not found: {walk_file}", file=sys.stderr)
sys.exit(1)
_dbg(f"Parsing {walk_file}")
_dbg(f"Looking for remPortId: {rem_port_id}")
t_start = time.time()
oid_data = parse_walk_file(walk_file)
_dbg(f"Parsed {len(oid_data)} OID entries")
result = build_neighbor_output(oid_data, rem_port_id)
result["_meta"]["walk_lines"] = len(oid_data)
result["_meta"]["elapsed_sec"] = round(time.time() - t_start, 2)
# Try to extract target IP from filename pattern: {IP}_{timestamp}_..._walk.txt
# e.g., 172-16-50-4_2026-03-01_10-00-00_neighbor_walk.txt
name = walk_file.stem
ip_match = re.match(r"([\d-]+)_", name)
if ip_match:
result["_meta"]["target_ip"] = ip_match.group(1).replace("-", ".")
output_path.write_text(json.dumps(result, indent=2))
print(f"Wrote {output_path}")
if __name__ == "__main__":
main()