#!/usr/bin/env python3 """ Cisco SNMP Walk Parser Parses raw snmpbulkwalk -On -OQ output from a Cisco device and produces a structured JSON file with interface details focused on a specific remPortId (LLDP neighbor interface). Usage: python3 cisco-parse.py [output.json] Examples: python3 cisco-parse.py walks/172-16-50-4_walk.txt Te1/1/3 python3 cisco-parse.py walks/172-16-50-4_walk.txt Gi1/0/39 output.json """ import json import re import sys import time from datetime import datetime, timezone from pathlib import Path # ──────────────────────────────────────────────────────────────────────── # Cisco short-name <-> long-name mapping # ──────────────────────────────────────────────────────────────────────── CISCO_SHORT_TO_LONG = { # IOS-XE naming "Te": "TenGigabitEthernet", "Gi": "GigabitEthernet", "Fa": "FastEthernet", "Fo": "FortyGigabitEthernet", "Tw": "TwentyFiveGigE", "Twe": "TwentyFiveGigE", "Eth": "Ethernet", "Po": "Port-channel", "Vl": "Vlan", # IOS-XR naming (NCS 5500, ASR 9000, etc.) "TenGigE": "TenGigE", "HundredGigE": "HundredGigE", "TwentyFiveGigE": "TwentyFiveGigE", "FortyGigE": "FortyGigE", "GigE": "GigE", # rare but exists on some XR platforms "Bundle-Ether": "Bundle-Ether", "BE": "Bundle-Ether", # Common to both "Hu": "HundredGigE", "BDI": "BDI", "BVI": "BVI", "Lo": "Loopback", } # Reverse map: long -> short (built from the canonical mapping) CISCO_LONG_TO_SHORT = {} for _short, _long in CISCO_SHORT_TO_LONG.items(): # If multiple shorts map to the same long, prefer the shorter abbreviation if _long not in CISCO_LONG_TO_SHORT or len(_short) < len(CISCO_LONG_TO_SHORT[_long]): CISCO_LONG_TO_SHORT[_long] = _short # ──────────────────────────────────────────────────────────────────────── # OID constants # ──────────────────────────────────────────────────────────────────────── OID_SYS_DESCR = ".1.3.6.1.2.1.1.1.0" OID_SYS_UPTIME = ".1.3.6.1.2.1.1.3.0" OID_SYS_NAME = ".1.3.6.1.2.1.1.5.0" # IF-MIB base prefixes (append .{ifIndex}) OID_IF_DESCR = ".1.3.6.1.2.1.2.2.1.2" OID_IF_TYPE = ".1.3.6.1.2.1.2.2.1.3" OID_IF_MTU = ".1.3.6.1.2.1.2.2.1.4" OID_IF_SPEED = ".1.3.6.1.2.1.2.2.1.5" OID_IF_ADMIN_STATUS = ".1.3.6.1.2.1.2.2.1.7" OID_IF_OPER_STATUS = ".1.3.6.1.2.1.2.2.1.8" OID_IF_IN_OCTETS = ".1.3.6.1.2.1.2.2.1.10" OID_IF_IN_DISCARDS = ".1.3.6.1.2.1.2.2.1.13" OID_IF_IN_ERRORS = ".1.3.6.1.2.1.2.2.1.14" OID_IF_OUT_OCTETS = ".1.3.6.1.2.1.2.2.1.16" OID_IF_OUT_DISCARDS = ".1.3.6.1.2.1.2.2.1.19" OID_IF_OUT_ERRORS = ".1.3.6.1.2.1.2.2.1.20" # IF-MIB ifXTable OID_IF_NAME = ".1.3.6.1.2.1.31.1.1.1.1" OID_IF_HC_IN_OCTETS = ".1.3.6.1.2.1.31.1.1.1.6" OID_IF_HC_OUT_OCTETS = ".1.3.6.1.2.1.31.1.1.1.10" OID_IF_HIGH_SPEED = ".1.3.6.1.2.1.31.1.1.1.15" OID_IF_ALIAS = ".1.3.6.1.2.1.31.1.1.1.18" # ifStackTable: .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower} OID_IF_STACK_STATUS = ".1.3.6.1.2.1.31.1.2.1.3" # ENTITY-MIB OID_ENT_PHYS_PREFIX = ".1.3.6.1.2.1.47.1.1.1.1" # CISCO-ENTITY-SENSOR-MIB OID_CISCO_SENSOR_PREFIX = ".1.3.6.1.4.1.9.9.91.1.1.1.1" # ──────────────────────────────────────────────────────────────────────── # Walk file parser # ──────────────────────────────────────────────────────────────────────── def parse_walk_file(walk_file): """Parse an snmpbulkwalk -On -OQ output file into {oid: value} dict. Lines look like: .1.3.6.1.2.1.2.2.1.2.62 = "TenGigabitEthernet1/1/3" .1.3.6.1.2.1.2.2.1.7.62 = 1 String values have surrounding quotes stripped. """ walk_path = Path(walk_file) oid_data = {} with walk_path.open("r", errors="replace") as fh: pending_oid = None pending_val = None for raw_line in fh: line = raw_line.rstrip("\n\r") # Handle multi-line values (e.g., Cisco sysDescr spans multiple lines) if pending_oid is not None: # Continuation of a multi-line quoted value pending_val += " " + line.strip() if '"' in line: # Closing quote found — finalize val = pending_val.strip() if val.startswith('"'): val = val[1:] if val.endswith('"'): val = val[:-1] oid_data[pending_oid] = val pending_oid = None pending_val = None continue line = line.strip() if not line or line.startswith("#"): continue # Split on first ' = ' parts = line.split(" = ", 1) if len(parts) != 2: continue oid = parts[0].strip() value = parts[1].strip() # Check for opening quote without closing (multi-line value) if value.startswith('"') and not value.endswith('"'): pending_oid = oid pending_val = value continue # Strip surrounding quotes if len(value) >= 2 and value[0] == '"' and value[-1] == '"': value = value[1:-1] oid_data[oid] = value # Handle any trailing pending value if pending_oid is not None: val = pending_val.strip().strip('"') oid_data[pending_oid] = val return oid_data # ──────────────────────────────────────────────────────────────────────── # Helpers # ──────────────────────────────────────────────────────────────────────── def _get(oid_data, oid, default=""): """Safely retrieve a value from oid_data.""" return oid_data.get(oid, default) def _get_subtree(oid_data, prefix): """Return all entries whose OID starts with prefix (with trailing dot).""" dotted = prefix if prefix.endswith(".") else prefix + "." return {k: v for k, v in oid_data.items() if k.startswith(dotted) or k == prefix} def _expand_short_name(short_name): """Expand a Cisco short interface name to full form. 'Te1/1/3' -> 'TenGigabitEthernet1/1/3' 'Gi1/0/39' -> 'GigabitEthernet1/0/39' """ # Try each prefix, longest first to avoid partial matches (e.g., Twe before Tw) for short in sorted(CISCO_SHORT_TO_LONG.keys(), key=len, reverse=True): if short_name.startswith(short): remainder = short_name[len(short):] # Verify the remainder starts with a digit or slash (real interface numbering) if remainder and (remainder[0].isdigit() or remainder[0] == "/"): return CISCO_SHORT_TO_LONG[short] + remainder return short_name def _shorten_name(long_name): """Shorten a Cisco full interface name to abbreviated form. 'TenGigabitEthernet1/1/3' -> 'Te1/1/3' """ for long, short in sorted(CISCO_LONG_TO_SHORT.items(), key=lambda x: len(x[0]), reverse=True): if long_name.startswith(long): return short + long_name[len(long):] return long_name # ──────────────────────────────────────────────────────────────────────── # System info extraction # ──────────────────────────────────────────────────────────────────────── def get_system_info(oid_data): """Extract system-level information.""" sys_descr = _get(oid_data, OID_SYS_DESCR) sys_uptime = _get(oid_data, OID_SYS_UPTIME) sys_name = _get(oid_data, OID_SYS_NAME) os_type = "unknown" platform = "" if "IOS-XR" in sys_descr: os_type = "IOS-XR" # Try to extract version ver_match = re.search(r"Version\s+([\d.]+)", sys_descr) version = ver_match.group(1) if ver_match else "" # Try to extract platform from sysDescr plat_match = re.search(r"Cisco\s+(\S+)", sys_descr) plat_name = plat_match.group(1) if plat_match else "Cisco" platform = f"Cisco {plat_name} IOS-XR {version}".strip() elif "IOS Software" in sys_descr or "Cisco IOS" in sys_descr: os_type = "IOS-XE" # Try to extract version ver_match = re.search(r"Version\s+([\d.()A-Za-z]+)", sys_descr) version = ver_match.group(1) if ver_match else "" # Try to extract platform (e.g., CAT3K, C3850, Catalyst) plat_match = re.search(r"(CAT\w+|C\d\w+|Catalyst\s+\S+)", sys_descr) if not plat_match: plat_match = re.search(r"Cisco\s+(\S+)", sys_descr) plat_name = plat_match.group(1) if plat_match else "Cisco" platform = f"Cisco {plat_name} IOS-XE {version}".strip() return { "sysName": sys_name, "sysDescr": sys_descr, "sysUpTime": sys_uptime, "platform": platform, "osType": os_type, } # ──────────────────────────────────────────────────────────────────────── # Interface index builder # ──────────────────────────────────────────────────────────────────────── def build_interface_index(oid_data): """Build a dict of ifIndex -> {ifDescr, ifName, ...} from walk data.""" interfaces = {} # Collect ifDescr entries descr_prefix = OID_IF_DESCR + "." for oid, value in oid_data.items(): if oid.startswith(descr_prefix): ifindex = oid[len(descr_prefix):] if ifindex not in interfaces: interfaces[ifindex] = {} interfaces[ifindex]["ifDescr"] = value # Collect ifName entries name_prefix = OID_IF_NAME + "." for oid, value in oid_data.items(): if oid.startswith(name_prefix): ifindex = oid[len(name_prefix):] if ifindex not in interfaces: interfaces[ifindex] = {} interfaces[ifindex]["ifName"] = value return interfaces # ──────────────────────────────────────────────────────────────────────── # remPortId matching # ──────────────────────────────────────────────────────────────────────── def match_rem_port_id(interfaces, rem_port_id): """Match a remPortId string to an ifIndex. Tries: 1. Exact match against ifName values 2. Expand short name and match against ifDescr 3. Direct match against ifDescr (already long form) Returns ifIndex as string, or None. """ # 1. Exact match against ifName for ifindex, info in interfaces.items(): if info.get("ifName", "") == rem_port_id: _dbg(f"Matched remPortId '{rem_port_id}' via ifName -> ifIndex {ifindex}") return ifindex # 2. Expand short name and match ifDescr expanded = _expand_short_name(rem_port_id) if expanded != rem_port_id: for ifindex, info in interfaces.items(): if info.get("ifDescr", "") == expanded: _dbg(f"Matched remPortId '{rem_port_id}' (expanded: '{expanded}') via ifDescr -> ifIndex {ifindex}") return ifindex # 3. Direct match against ifDescr for ifindex, info in interfaces.items(): if info.get("ifDescr", "") == rem_port_id: _dbg(f"Matched remPortId '{rem_port_id}' via ifDescr direct -> ifIndex {ifindex}") return ifindex # 4. Cross-platform fallback: IOS-XE short name on IOS-XR device or vice versa # e.g., remPortId "Te0/0/0/5" expands to "TenGigabitEthernet0/0/0/5" (IOS-XE) # but the NCS ifDescr is "TenGigE0/0/0/5" (IOS-XR) # Try alternate expansions by extracting the slot/port suffix and matching xr_xe_pairs = { "TenGigabitEthernet": "TenGigE", "TenGigE": "TenGigabitEthernet", "GigabitEthernet": "GigE", "GigE": "GigabitEthernet", "FortyGigabitEthernet": "FortyGigE", "FortyGigE": "FortyGigabitEthernet", "Port-channel": "Bundle-Ether", "Bundle-Ether": "Port-channel", } if expanded != rem_port_id: for xe_name, xr_name in xr_xe_pairs.items(): if expanded.startswith(xe_name): alt = xr_name + expanded[len(xe_name):] for ifindex, info in interfaces.items(): if info.get("ifDescr", "") == alt: _dbg(f"Matched remPortId '{rem_port_id}' via cross-platform '{alt}' -> ifIndex {ifindex}") return ifindex _dbg(f"WARNING: Could not match remPortId '{rem_port_id}' to any interface") return None # ──────────────────────────────────────────────────────────────────────── # Interface facts extraction # ──────────────────────────────────────────────────────────────────────── def get_interface_facts(oid_data, ifindex): """Extract all relevant facts for a given ifIndex.""" idx = str(ifindex) return { "ifDescr": _get(oid_data, f"{OID_IF_DESCR}.{idx}"), "ifType": _get(oid_data, f"{OID_IF_TYPE}.{idx}"), "ifMtu": _get(oid_data, f"{OID_IF_MTU}.{idx}"), "ifSpeed": _get(oid_data, f"{OID_IF_SPEED}.{idx}"), "ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{idx}"), "ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{idx}"), "ifInOctets": _get(oid_data, f"{OID_IF_IN_OCTETS}.{idx}"), "ifInErrors": _get(oid_data, f"{OID_IF_IN_ERRORS}.{idx}"), "ifOutOctets": _get(oid_data, f"{OID_IF_OUT_OCTETS}.{idx}"), "ifOutErrors": _get(oid_data, f"{OID_IF_OUT_ERRORS}.{idx}"), "ifInDiscards": _get(oid_data, f"{OID_IF_IN_DISCARDS}.{idx}"), "ifOutDiscards": _get(oid_data, f"{OID_IF_OUT_DISCARDS}.{idx}"), "ifName": _get(oid_data, f"{OID_IF_NAME}.{idx}"), "ifHighSpeed": _get(oid_data, f"{OID_IF_HIGH_SPEED}.{idx}"), "ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{idx}"), "ifHCInOctets": _get(oid_data, f"{OID_IF_HC_IN_OCTETS}.{idx}"), "ifHCOutOctets": _get(oid_data, f"{OID_IF_HC_OUT_OCTETS}.{idx}"), } # ──────────────────────────────────────────────────────────────────────── # Subinterface discovery # ──────────────────────────────────────────────────────────────────────── def discover_subinterfaces_stack(oid_data, parent_ifindex): """Discover child interfaces via ifStackTable. In ifStackTable, .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower} = status When lower == parent_ifindex, higher is a child/sub-interface layered on top of the parent. Skip ifIndex 0 (represents 'not stacked'). """ children = set() prefix = OID_IF_STACK_STATUS + "." parent_suffix = f".{parent_ifindex}" for oid in oid_data: if not oid.startswith(prefix): continue remainder = oid[len(prefix):] parts = remainder.split(".", 1) if len(parts) != 2: continue higher, lower = parts[0], parts[1] if lower == str(parent_ifindex) and higher != "0": children.add(higher) return children def discover_subinterfaces_pattern(oid_data, parent_descr): """Discover subinterfaces by name pattern matching. Look for ifDescr values like '{parent_descr}.{number}'. """ children = {} prefix = OID_IF_DESCR + "." pattern = parent_descr + "." for oid, value in oid_data.items(): if not oid.startswith(prefix): continue if value.startswith(pattern): ifindex = oid[len(prefix):] children[ifindex] = value return children def discover_subinterfaces(oid_data, parent_ifindex, parent_descr): """Combine stack-based and pattern-based subinterface discovery.""" # Approach A: ifStackTable stack_children = discover_subinterfaces_stack(oid_data, parent_ifindex) # Approach B: pattern matching pattern_children = discover_subinterfaces_pattern(oid_data, parent_descr) # Merge: union of both sets of ifIndex values all_child_indices = set(stack_children) | set(pattern_children.keys()) return all_child_indices # ──────────────────────────────────────────────────────────────────────── # VLAN / SVI discovery (IOS-XE) # ──────────────────────────────────────────────────────────────────────── def discover_vlans(oid_data, os_type): """Discover VLAN SVIs for IOS-XE devices. On IOS-XE Catalyst switches, scan ifDescr for 'Vlan{N}' entries and collect their interface facts. """ vlans = {} if os_type != "IOS-XE": return vlans prefix = OID_IF_DESCR + "." vlan_re = re.compile(r"^Vlan(\d+)$") for oid, value in oid_data.items(): if not oid.startswith(prefix): continue m = vlan_re.match(value) if not m: continue vlan_num = m.group(1) ifindex = oid[len(prefix):] vlans[vlan_num] = { "ifIndex": ifindex, "ifDescr": value, "ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"), "ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{ifindex}"), "ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"), } return vlans # ──────────────────────────────────────────────────────────────────────── # BDI / BVI correlation # ──────────────────────────────────────────────────────────────────────── def find_bdi_bvi(oid_data, vlan_id): """Find a BDI{vlan_id} or BVI{vlan_id} interface if it exists. Returns (ifIndex, ifDescr, ifOperStatus, ifAlias) or (None, None, None, None). """ prefix = OID_IF_DESCR + "." targets = [f"BDI{vlan_id}", f"BVI{vlan_id}"] for oid, value in oid_data.items(): if not oid.startswith(prefix): continue if value in targets: ifindex = oid[len(prefix):] return ( ifindex, value, _get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"), _get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"), ) return (None, None, None, None) # ──────────────────────────────────────────────────────────────────────── # Optics / Entity Sensor (best-effort) # ──────────────────────────────────────────────────────────────────────── def get_optics_info(oid_data, parent_ifindex, parent_descr): """Best-effort extraction of optics data from ENTITY-MIB and CISCO-ENTITY-SENSOR-MIB. Strategy: 1. Walk entPhysicalName (.1.3.6.1.2.1.47.1.1.1.1.7) to find entities whose name contains the interface name/description. 2. For those entity indices, look up sensor values from CISCO-ENTITY-SENSOR-MIB (.1.3.6.1.4.1.9.9.91.1.1.1.1.4.{entIdx}). 3. Use entPhysicalDescr or entSensorType to classify as Tx/Rx/temp. Returns dict with txPower, rxPower, temperature (all may be None). """ result = { "txPower": None, "rxPower": None, "temperature": None, } # entPhysicalName: .1.3.6.1.2.1.47.1.1.1.1.7.{entIdx} ent_name_prefix = OID_ENT_PHYS_PREFIX.rsplit(".", 1)[0] + ".7." # Actually: .1.3.6.1.2.1.47.1.1.1.1.7.{idx} ent_name_prefix = ".1.3.6.1.2.1.47.1.1.1.1.7." # entPhysicalDescr: .1.3.6.1.2.1.47.1.1.1.1.2.{idx} ent_descr_prefix = ".1.3.6.1.2.1.47.1.1.1.1.2." # entSensorValue: .1.3.6.1.4.1.9.9.91.1.1.1.1.4.{idx} sensor_value_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.4." # entSensorType: .1.3.6.1.4.1.9.9.91.1.1.1.1.1.{idx} sensor_type_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.1." # Find entity indices that match the interface short_name = _shorten_name(parent_descr) matching_ent_indices = [] for oid, value in oid_data.items(): if not oid.startswith(ent_name_prefix): continue # Check if entity name references our interface if parent_descr in value or short_name in value: ent_idx = oid[len(ent_name_prefix):] matching_ent_indices.append(ent_idx) if not matching_ent_indices: return result _dbg(f"Found {len(matching_ent_indices)} entity entries for {parent_descr}") # For matching entities, look up sensor readings for ent_idx in matching_ent_indices: sensor_val = _get(oid_data, f"{sensor_value_prefix}{ent_idx}") if not sensor_val: continue # Determine sensor type from entSensorType or entPhysicalDescr sensor_type = _get(oid_data, f"{sensor_type_prefix}{ent_idx}") ent_descr = _get(oid_data, f"{ent_descr_prefix}{ent_idx}").lower() # entSensorType: 8 = celsius, 14 = dBm # Also check description text for classification if sensor_type == "8" or "temperature" in ent_descr or "temp" in ent_descr: result["temperature"] = sensor_val elif "transmit" in ent_descr or "tx" in ent_descr: result["txPower"] = sensor_val elif "receive" in ent_descr or "rx" in ent_descr: result["rxPower"] = sensor_val elif sensor_type == "14": # dBm but unclassified — assign to first empty power slot if result["txPower"] is None: result["txPower"] = sensor_val elif result["rxPower"] is None: result["rxPower"] = sensor_val return result # ──────────────────────────────────────────────────────────────────────── # Debug helper # ──────────────────────────────────────────────────────────────────────── def _dbg(msg): """Print debug/progress info to stderr.""" print(f"[cisco-parse] {msg}", file=sys.stderr) # ──────────────────────────────────────────────────────────────────────── # Main builder # ──────────────────────────────────────────────────────────────────────── def build_neighbor_output(oid_data, rem_port_id): """Build the structured neighbor output from parsed OID data. Args: oid_data: dict from parse_walk_file() rem_port_id: interface identifier to focus on (e.g., 'Te1/1/3') Returns: dict matching the output JSON structure """ # System info sys_info = get_system_info(oid_data) _dbg(f"System: {sys_info['sysName']} ({sys_info['osType']})") # Build interface index interfaces = build_interface_index(oid_data) _dbg(f"Found {len(interfaces)} interfaces in walk data") # Match remPortId to ifIndex matched_ifindex = match_rem_port_id(interfaces, rem_port_id) # Build queried interface section queried_iface = {"remPortId": rem_port_id} parent_descr = "" if matched_ifindex is not None: facts = get_interface_facts(oid_data, matched_ifindex) parent_descr = facts.get("ifDescr", "") queried_iface.update({ "ifIndex": matched_ifindex, "ifDescr": facts["ifDescr"], "ifName": facts["ifName"], "ifAlias": facts["ifAlias"], "ifType": facts["ifType"], "ifAdminStatus": facts["ifAdminStatus"], "ifOperStatus": facts["ifOperStatus"], "ifSpeed": facts["ifSpeed"], "ifHighSpeed": facts["ifHighSpeed"], "ifMtu": facts["ifMtu"], "ifHCInOctets": facts["ifHCInOctets"], "ifHCOutOctets": facts["ifHCOutOctets"], "ifInErrors": facts["ifInErrors"], "ifOutErrors": facts["ifOutErrors"], "ifInDiscards": facts["ifInDiscards"], "ifOutDiscards": facts["ifOutDiscards"], }) _dbg(f"Queried interface: {parent_descr} (ifIndex {matched_ifindex})") else: # Fill with empty values so the JSON schema is consistent for key in ("ifIndex", "ifDescr", "ifName", "ifAlias", "ifType", "ifAdminStatus", "ifOperStatus", "ifSpeed", "ifHighSpeed", "ifMtu", "ifHCInOctets", "ifHCOutOctets", "ifInErrors", "ifOutErrors", "ifInDiscards", "ifOutDiscards"): queried_iface[key] = "" _dbg("WARNING: No interface matched, output will have empty queried_interface") # Discover subinterfaces subinterfaces = {} if matched_ifindex and parent_descr: child_indices = discover_subinterfaces(oid_data, matched_ifindex, parent_descr) _dbg(f"Found {len(child_indices)} subinterfaces") for child_idx in sorted(child_indices, key=lambda x: int(x) if x.isdigit() else 0): child_facts = get_interface_facts(oid_data, child_idx) child_descr = child_facts.get("ifDescr", "") # Extract VLAN ID from .NNNN suffix vlan_id = "" vlan_match = re.search(r"\.(\d+)$", child_descr) if vlan_match: vlan_id = vlan_match.group(1) # BDI/BVI correlation bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = (None, None, None, None) if vlan_id: bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = find_bdi_bvi(oid_data, vlan_id) subinterfaces[child_idx] = { "ifDescr": child_descr, "ifName": child_facts.get("ifName", ""), "ifAlias": child_facts.get("ifAlias", ""), "ifAdminStatus": child_facts.get("ifAdminStatus", ""), "ifOperStatus": child_facts.get("ifOperStatus", ""), "vlanId": vlan_id, "bvi_ifIndex": bvi_ifindex, "bvi_ifDescr": bvi_descr, "bvi_ifOperStatus": bvi_oper, "bvi_ifAlias": bvi_alias, } # Discover VLANs (IOS-XE) vlans = discover_vlans(oid_data, sys_info["osType"]) _dbg(f"Found {len(vlans)} VLAN SVIs") # Optics (best-effort) optics = {"txPower": None, "rxPower": None, "temperature": None} if matched_ifindex and parent_descr: optics = get_optics_info(oid_data, matched_ifindex, parent_descr) # Assemble output output = { "neighbor_system": sys_info, "queried_interface": queried_iface, "subinterfaces": subinterfaces, "vlans": vlans, "optics": optics, "_meta": { "polled_at": datetime.now(timezone.utc).isoformat(), "target_ip": "", "walk_lines": 0, "elapsed_sec": 0.0, "remPortId": rem_port_id, }, } return output # ──────────────────────────────────────────────────────────────────────── # CLI entry point # ──────────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 3: print("Usage: cisco-parse.py [output.json]") print() print("Arguments:") print(" walk_file Raw snmpbulkwalk -On -OQ output file") print(" remPortId Interface to focus on (e.g., Te1/1/3, Gi1/0/39)") print(" output.json Optional output path (default: {stem}_neighbor_monitoring.json)") sys.exit(1) walk_file = Path(sys.argv[1]) rem_port_id = sys.argv[2] output_path = ( Path(sys.argv[3]) if len(sys.argv) > 3 else walk_file.with_name(walk_file.stem + "_neighbor_monitoring.json") ) if not walk_file.exists(): print(f"Error: walk file not found: {walk_file}", file=sys.stderr) sys.exit(1) _dbg(f"Parsing {walk_file}") _dbg(f"Looking for remPortId: {rem_port_id}") t_start = time.time() oid_data = parse_walk_file(walk_file) _dbg(f"Parsed {len(oid_data)} OID entries") result = build_neighbor_output(oid_data, rem_port_id) result["_meta"]["walk_lines"] = len(oid_data) result["_meta"]["elapsed_sec"] = round(time.time() - t_start, 2) # Try to extract target IP from filename pattern: {IP}_{timestamp}_..._walk.txt # e.g., 172-16-50-4_2026-03-01_10-00-00_neighbor_walk.txt name = walk_file.stem ip_match = re.match(r"([\d-]+)_", name) if ip_match: result["_meta"]["target_ip"] = ip_match.group(1).replace("-", ".") output_path.write_text(json.dumps(result, indent=2)) print(f"Wrote {output_path}") if __name__ == "__main__": main()