Compare commits

...

9 Commits

Author SHA1 Message Date
sam
c285810c68 Two-phase focused neighbor walk and fix status/optics bugs
- Restructure neighbor walk into Phase 1 (discovery: ifDescr + ifName +
  ifStackTable) and Phase 2 (targeted snmpget for matched interfaces only).
  Reduces NCS 5500 walk from ~150k OIDs to ~20k discovery + ~600 targeted.
- Rename cisco-parse.py to cisco_parse.py for Python import compatibility.
- Add parse_walk_text() for in-process parsing without file I/O.
- Fix interface status showing DOWN/ADMIN DOWN: use isUp() instead of
  hardcoded === '1' checks, add -Oe flag to snmpget for numeric enums.
- Fix optics showing raw sensor values: apply entSensorPrecision scaling
  (e.g., -95122 with precision 4 → -9.5122 dBm).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:34 -07:00
Sam Lewis
9b98e260d1 added more images 2026-03-09 10:47:17 -07:00
sam
71eba349b8 Add IOS-XR interface naming support to cisco-parse.py
- Add TenGigE, HundredGigE, FortyGigE, GigE, Bundle-Ether
  to the short-name expansion map
- Add cross-platform matching fallback: when remPortId expands
  to an IOS-XE name but the target is IOS-XR (or vice versa),
  try the alternate naming convention
- Handles NCS 5500 where ifDescr uses TenGigE0/0/0/5 format
  vs IOS-XE TenGigabitEthernet1/1/3

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 11:06:23 -07:00
sam
c64a80810f Add LLDP neighbor device polling (Cisco C3850 support)
Follow the LLDP breadcrumb back to the connected router/switch:
- New cisco-parse.py: standalone parser for Cisco SNMP walk data
  with interface matching, subinterface/SVI discovery, BDI/BVI
  correlation, and optics extraction
- New /api/neighbor-walk and /api/neighbor-data endpoints
- "Poll Neighbor" button in LLDP topology cards
- Connected Neighbor Devices card showing interface status,
  counters, SVIs, and subinterface mappings
- Platform-aware: handles IOS-XE (SVIs) and IOS-XR (subinterfaces)
- Tested against lab C3850-04 (172.16.50.4) — 4,288 OIDs in 1.1s

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 10:48:44 -07:00
sam
bcb179e7e4 Fix TruthValue handling for different firmware versions
Some Accedian firmware reports SNMP TruthValue fields as '1'/'2'
(INTEGER) while others use 'true'/'false' (textual). Add isTrue()
helper that accepts both formats and replace all 15+ boolean checks
across the viewer (present, diagCapable, active, enabled, filter
enable flags, etc).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 08:30:36 -07:00
Sam Lewis
4daf26b778 Added walks folder 2026-03-06 08:22:22 -07:00
Sam Lewis
09a2db7373 added images 2026-03-06 08:02:33 -07:00
sam
df8c74627b Dynamic port rendering for multi-port NID devices
Replace hardcoded 4-port loops in renderPanel(), renderSfp(), and
renderLldp() with dynamic iteration over connectors data. Devices
like the AMO-10000-LT-S with more than 4 ports now render all
ports automatically. Management port detected by connector name.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 19:45:34 -07:00
sam
d6bf394297 Add optional policy MIB toggle and pre-walk ping check
- Add SNMP_WALK_POLICIES env var and UI checkbox to skip ACD-POLICY-MIB
  (~73% of all OIDs), cutting walk time from ~25s to ~11s
- Add /api/ping endpoint with reachability check before walk starts
- Show "NID Management is UP" (green) or "NID is DOWN" (red) status
- Block walk if target is unreachable

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 10:02:52 -07:00
39 changed files with 286542 additions and 264 deletions

View File

@ -22,5 +22,16 @@ SNMP_COMMUNITY=public
# "targeted" = walk only subtrees used by the viewer (faster)
SNMP_WALK_MODE=targeted
# ── Policy data ──
# ACD-POLICY-MIB is ~73% of all OIDs. Set to "false" to skip it for faster walks.
# The Traffic Policies card will be empty when disabled.
SNMP_WALK_POLICIES=true
# ── Neighbor Device SNMP ──
# Credentials for polling LLDP-discovered neighbor devices (Cisco routers/switches)
# Falls back to NID credentials (SNMP_COMMUNITY / v3 settings) if not set
NEIGHBOR_SNMP_VERSION=2c
NEIGHBOR_SNMP_COMMUNITY=public
# ── Server ──
SERVER_PORT=5525

File diff suppressed because it is too large Load Diff

799
cisco_parse.py Normal file
View File

@ -0,0 +1,799 @@
#!/usr/bin/env python3
"""
Cisco SNMP Walk Parser
Parses raw snmpbulkwalk -On -OQ output from a Cisco device and produces
a structured JSON file with interface details focused on a specific
remPortId (LLDP neighbor interface).
Usage:
python3 cisco-parse.py <walk_file.txt> <remPortId> [output.json]
Examples:
python3 cisco-parse.py walks/172-16-50-4_walk.txt Te1/1/3
python3 cisco-parse.py walks/172-16-50-4_walk.txt Gi1/0/39 output.json
"""
import json
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
# ────────────────────────────────────────────────────────────────────────
# Cisco short-name <-> long-name mapping
# ────────────────────────────────────────────────────────────────────────
CISCO_SHORT_TO_LONG = {
# IOS-XE naming
"Te": "TenGigabitEthernet",
"Gi": "GigabitEthernet",
"Fa": "FastEthernet",
"Fo": "FortyGigabitEthernet",
"Tw": "TwentyFiveGigE",
"Twe": "TwentyFiveGigE",
"Eth": "Ethernet",
"Po": "Port-channel",
"Vl": "Vlan",
# IOS-XR naming (NCS 5500, ASR 9000, etc.)
"TenGigE": "TenGigE",
"HundredGigE": "HundredGigE",
"TwentyFiveGigE": "TwentyFiveGigE",
"FortyGigE": "FortyGigE",
"GigE": "GigE", # rare but exists on some XR platforms
"Bundle-Ether": "Bundle-Ether",
"BE": "Bundle-Ether",
# Common to both
"Hu": "HundredGigE",
"BDI": "BDI",
"BVI": "BVI",
"Lo": "Loopback",
}
# Reverse map: long -> short (built from the canonical mapping)
CISCO_LONG_TO_SHORT = {}
for _short, _long in CISCO_SHORT_TO_LONG.items():
# If multiple shorts map to the same long, prefer the shorter abbreviation
if _long not in CISCO_LONG_TO_SHORT or len(_short) < len(CISCO_LONG_TO_SHORT[_long]):
CISCO_LONG_TO_SHORT[_long] = _short
# ────────────────────────────────────────────────────────────────────────
# OID constants
# ────────────────────────────────────────────────────────────────────────
OID_SYS_DESCR = ".1.3.6.1.2.1.1.1.0"
OID_SYS_UPTIME = ".1.3.6.1.2.1.1.3.0"
OID_SYS_NAME = ".1.3.6.1.2.1.1.5.0"
# IF-MIB base prefixes (append .{ifIndex})
OID_IF_DESCR = ".1.3.6.1.2.1.2.2.1.2"
OID_IF_TYPE = ".1.3.6.1.2.1.2.2.1.3"
OID_IF_MTU = ".1.3.6.1.2.1.2.2.1.4"
OID_IF_SPEED = ".1.3.6.1.2.1.2.2.1.5"
OID_IF_ADMIN_STATUS = ".1.3.6.1.2.1.2.2.1.7"
OID_IF_OPER_STATUS = ".1.3.6.1.2.1.2.2.1.8"
OID_IF_IN_OCTETS = ".1.3.6.1.2.1.2.2.1.10"
OID_IF_IN_DISCARDS = ".1.3.6.1.2.1.2.2.1.13"
OID_IF_IN_ERRORS = ".1.3.6.1.2.1.2.2.1.14"
OID_IF_OUT_OCTETS = ".1.3.6.1.2.1.2.2.1.16"
OID_IF_OUT_DISCARDS = ".1.3.6.1.2.1.2.2.1.19"
OID_IF_OUT_ERRORS = ".1.3.6.1.2.1.2.2.1.20"
# IF-MIB ifXTable
OID_IF_NAME = ".1.3.6.1.2.1.31.1.1.1.1"
OID_IF_HC_IN_OCTETS = ".1.3.6.1.2.1.31.1.1.1.6"
OID_IF_HC_OUT_OCTETS = ".1.3.6.1.2.1.31.1.1.1.10"
OID_IF_HIGH_SPEED = ".1.3.6.1.2.1.31.1.1.1.15"
OID_IF_ALIAS = ".1.3.6.1.2.1.31.1.1.1.18"
# ifStackTable: .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower}
OID_IF_STACK_STATUS = ".1.3.6.1.2.1.31.1.2.1.3"
# ENTITY-MIB
OID_ENT_PHYS_PREFIX = ".1.3.6.1.2.1.47.1.1.1.1"
# CISCO-ENTITY-SENSOR-MIB
OID_CISCO_SENSOR_PREFIX = ".1.3.6.1.4.1.9.9.91.1.1.1.1"
# ────────────────────────────────────────────────────────────────────────
# Walk file parser
# ────────────────────────────────────────────────────────────────────────
def _parse_lines(lines):
"""Parse snmpbulkwalk/snmpget -On -OQ output lines into {oid: value} dict.
Handles multi-line quoted values (e.g., Cisco sysDescr).
String values have surrounding quotes stripped.
"""
oid_data = {}
pending_oid = None
pending_val = None
for raw_line in lines:
line = raw_line.rstrip("\n\r")
if pending_oid is not None:
pending_val += " " + line.strip()
if '"' in line:
val = pending_val.strip()
if val.startswith('"'):
val = val[1:]
if val.endswith('"'):
val = val[:-1]
oid_data[pending_oid] = val
pending_oid = None
pending_val = None
continue
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(" = ", 1)
if len(parts) != 2:
continue
oid = parts[0].strip()
value = parts[1].strip()
if value.startswith('"') and not value.endswith('"'):
pending_oid = oid
pending_val = value
continue
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
value = value[1:-1]
oid_data[oid] = value
if pending_oid is not None:
val = pending_val.strip().strip('"')
oid_data[pending_oid] = val
return oid_data
def parse_walk_file(walk_file):
"""Parse an snmpbulkwalk -On -OQ output file into {oid: value} dict."""
walk_path = Path(walk_file)
with walk_path.open("r", errors="replace") as fh:
return _parse_lines(fh)
def parse_walk_text(text):
"""Parse snmpbulkwalk/snmpget -On -OQ output from a string.
Useful for in-process parsing without writing to a file first.
"""
return _parse_lines(text.splitlines())
# ────────────────────────────────────────────────────────────────────────
# Helpers
# ────────────────────────────────────────────────────────────────────────
def _get(oid_data, oid, default=""):
"""Safely retrieve a value from oid_data."""
return oid_data.get(oid, default)
def _get_subtree(oid_data, prefix):
"""Return all entries whose OID starts with prefix (with trailing dot)."""
dotted = prefix if prefix.endswith(".") else prefix + "."
return {k: v for k, v in oid_data.items() if k.startswith(dotted) or k == prefix}
def _expand_short_name(short_name):
"""Expand a Cisco short interface name to full form.
'Te1/1/3' -> 'TenGigabitEthernet1/1/3'
'Gi1/0/39' -> 'GigabitEthernet1/0/39'
"""
# Try each prefix, longest first to avoid partial matches (e.g., Twe before Tw)
for short in sorted(CISCO_SHORT_TO_LONG.keys(), key=len, reverse=True):
if short_name.startswith(short):
remainder = short_name[len(short):]
# Verify the remainder starts with a digit or slash (real interface numbering)
if remainder and (remainder[0].isdigit() or remainder[0] == "/"):
return CISCO_SHORT_TO_LONG[short] + remainder
return short_name
def _shorten_name(long_name):
"""Shorten a Cisco full interface name to abbreviated form.
'TenGigabitEthernet1/1/3' -> 'Te1/1/3'
"""
for long, short in sorted(CISCO_LONG_TO_SHORT.items(), key=lambda x: len(x[0]), reverse=True):
if long_name.startswith(long):
return short + long_name[len(long):]
return long_name
# ────────────────────────────────────────────────────────────────────────
# System info extraction
# ────────────────────────────────────────────────────────────────────────
def get_system_info(oid_data):
"""Extract system-level information."""
sys_descr = _get(oid_data, OID_SYS_DESCR)
sys_uptime = _get(oid_data, OID_SYS_UPTIME)
sys_name = _get(oid_data, OID_SYS_NAME)
os_type = "unknown"
platform = ""
if "IOS-XR" in sys_descr:
os_type = "IOS-XR"
# Try to extract version
ver_match = re.search(r"Version\s+([\d.]+)", sys_descr)
version = ver_match.group(1) if ver_match else ""
# Try to extract platform from sysDescr
plat_match = re.search(r"Cisco\s+(\S+)", sys_descr)
plat_name = plat_match.group(1) if plat_match else "Cisco"
platform = f"Cisco {plat_name} IOS-XR {version}".strip()
elif "IOS Software" in sys_descr or "Cisco IOS" in sys_descr:
os_type = "IOS-XE"
# Try to extract version
ver_match = re.search(r"Version\s+([\d.()A-Za-z]+)", sys_descr)
version = ver_match.group(1) if ver_match else ""
# Try to extract platform (e.g., CAT3K, C3850, Catalyst)
plat_match = re.search(r"(CAT\w+|C\d\w+|Catalyst\s+\S+)", sys_descr)
if not plat_match:
plat_match = re.search(r"Cisco\s+(\S+)", sys_descr)
plat_name = plat_match.group(1) if plat_match else "Cisco"
platform = f"Cisco {plat_name} IOS-XE {version}".strip()
return {
"sysName": sys_name,
"sysDescr": sys_descr,
"sysUpTime": sys_uptime,
"platform": platform,
"osType": os_type,
}
# ────────────────────────────────────────────────────────────────────────
# Interface index builder
# ────────────────────────────────────────────────────────────────────────
def build_interface_index(oid_data):
"""Build a dict of ifIndex -> {ifDescr, ifName, ...} from walk data."""
interfaces = {}
# Collect ifDescr entries
descr_prefix = OID_IF_DESCR + "."
for oid, value in oid_data.items():
if oid.startswith(descr_prefix):
ifindex = oid[len(descr_prefix):]
if ifindex not in interfaces:
interfaces[ifindex] = {}
interfaces[ifindex]["ifDescr"] = value
# Collect ifName entries
name_prefix = OID_IF_NAME + "."
for oid, value in oid_data.items():
if oid.startswith(name_prefix):
ifindex = oid[len(name_prefix):]
if ifindex not in interfaces:
interfaces[ifindex] = {}
interfaces[ifindex]["ifName"] = value
return interfaces
# ────────────────────────────────────────────────────────────────────────
# remPortId matching
# ────────────────────────────────────────────────────────────────────────
def match_rem_port_id(interfaces, rem_port_id):
"""Match a remPortId string to an ifIndex.
Tries:
1. Exact match against ifName values
2. Expand short name and match against ifDescr
3. Direct match against ifDescr (already long form)
Returns ifIndex as string, or None.
"""
# 1. Exact match against ifName
for ifindex, info in interfaces.items():
if info.get("ifName", "") == rem_port_id:
_dbg(f"Matched remPortId '{rem_port_id}' via ifName -> ifIndex {ifindex}")
return ifindex
# 2. Expand short name and match ifDescr
expanded = _expand_short_name(rem_port_id)
if expanded != rem_port_id:
for ifindex, info in interfaces.items():
if info.get("ifDescr", "") == expanded:
_dbg(f"Matched remPortId '{rem_port_id}' (expanded: '{expanded}') via ifDescr -> ifIndex {ifindex}")
return ifindex
# 3. Direct match against ifDescr
for ifindex, info in interfaces.items():
if info.get("ifDescr", "") == rem_port_id:
_dbg(f"Matched remPortId '{rem_port_id}' via ifDescr direct -> ifIndex {ifindex}")
return ifindex
# 4. Cross-platform fallback: IOS-XE short name on IOS-XR device or vice versa
# e.g., remPortId "Te0/0/0/5" expands to "TenGigabitEthernet0/0/0/5" (IOS-XE)
# but the NCS ifDescr is "TenGigE0/0/0/5" (IOS-XR)
# Try alternate expansions by extracting the slot/port suffix and matching
xr_xe_pairs = {
"TenGigabitEthernet": "TenGigE",
"TenGigE": "TenGigabitEthernet",
"GigabitEthernet": "GigE",
"GigE": "GigabitEthernet",
"FortyGigabitEthernet": "FortyGigE",
"FortyGigE": "FortyGigabitEthernet",
"Port-channel": "Bundle-Ether",
"Bundle-Ether": "Port-channel",
}
if expanded != rem_port_id:
for xe_name, xr_name in xr_xe_pairs.items():
if expanded.startswith(xe_name):
alt = xr_name + expanded[len(xe_name):]
for ifindex, info in interfaces.items():
if info.get("ifDescr", "") == alt:
_dbg(f"Matched remPortId '{rem_port_id}' via cross-platform '{alt}' -> ifIndex {ifindex}")
return ifindex
_dbg(f"WARNING: Could not match remPortId '{rem_port_id}' to any interface")
return None
# ────────────────────────────────────────────────────────────────────────
# Interface facts extraction
# ────────────────────────────────────────────────────────────────────────
def get_interface_facts(oid_data, ifindex):
"""Extract all relevant facts for a given ifIndex."""
idx = str(ifindex)
return {
"ifDescr": _get(oid_data, f"{OID_IF_DESCR}.{idx}"),
"ifType": _get(oid_data, f"{OID_IF_TYPE}.{idx}"),
"ifMtu": _get(oid_data, f"{OID_IF_MTU}.{idx}"),
"ifSpeed": _get(oid_data, f"{OID_IF_SPEED}.{idx}"),
"ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{idx}"),
"ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{idx}"),
"ifInOctets": _get(oid_data, f"{OID_IF_IN_OCTETS}.{idx}"),
"ifInErrors": _get(oid_data, f"{OID_IF_IN_ERRORS}.{idx}"),
"ifOutOctets": _get(oid_data, f"{OID_IF_OUT_OCTETS}.{idx}"),
"ifOutErrors": _get(oid_data, f"{OID_IF_OUT_ERRORS}.{idx}"),
"ifInDiscards": _get(oid_data, f"{OID_IF_IN_DISCARDS}.{idx}"),
"ifOutDiscards": _get(oid_data, f"{OID_IF_OUT_DISCARDS}.{idx}"),
"ifName": _get(oid_data, f"{OID_IF_NAME}.{idx}"),
"ifHighSpeed": _get(oid_data, f"{OID_IF_HIGH_SPEED}.{idx}"),
"ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{idx}"),
"ifHCInOctets": _get(oid_data, f"{OID_IF_HC_IN_OCTETS}.{idx}"),
"ifHCOutOctets": _get(oid_data, f"{OID_IF_HC_OUT_OCTETS}.{idx}"),
}
# ────────────────────────────────────────────────────────────────────────
# Subinterface discovery
# ────────────────────────────────────────────────────────────────────────
def discover_subinterfaces_stack(oid_data, parent_ifindex):
"""Discover child interfaces via ifStackTable.
In ifStackTable, .1.3.6.1.2.1.31.1.2.1.3.{higher}.{lower} = status
When lower == parent_ifindex, higher is a child/sub-interface layered
on top of the parent. Skip ifIndex 0 (represents 'not stacked').
"""
children = set()
prefix = OID_IF_STACK_STATUS + "."
parent_suffix = f".{parent_ifindex}"
for oid in oid_data:
if not oid.startswith(prefix):
continue
remainder = oid[len(prefix):]
parts = remainder.split(".", 1)
if len(parts) != 2:
continue
higher, lower = parts[0], parts[1]
if lower == str(parent_ifindex) and higher != "0":
children.add(higher)
return children
def discover_subinterfaces_pattern(oid_data, parent_descr):
"""Discover subinterfaces by name pattern matching.
Look for ifDescr values like '{parent_descr}.{number}'.
"""
children = {}
prefix = OID_IF_DESCR + "."
pattern = parent_descr + "."
for oid, value in oid_data.items():
if not oid.startswith(prefix):
continue
if value.startswith(pattern):
ifindex = oid[len(prefix):]
children[ifindex] = value
return children
def discover_subinterfaces(oid_data, parent_ifindex, parent_descr):
"""Combine stack-based and pattern-based subinterface discovery."""
# Approach A: ifStackTable
stack_children = discover_subinterfaces_stack(oid_data, parent_ifindex)
# Approach B: pattern matching
pattern_children = discover_subinterfaces_pattern(oid_data, parent_descr)
# Merge: union of both sets of ifIndex values
all_child_indices = set(stack_children) | set(pattern_children.keys())
return all_child_indices
# ────────────────────────────────────────────────────────────────────────
# VLAN / SVI discovery (IOS-XE)
# ────────────────────────────────────────────────────────────────────────
def discover_vlans(oid_data, os_type):
"""Discover VLAN SVIs for IOS-XE devices.
On IOS-XE Catalyst switches, scan ifDescr for 'Vlan{N}' entries
and collect their interface facts.
"""
vlans = {}
if os_type != "IOS-XE":
return vlans
prefix = OID_IF_DESCR + "."
vlan_re = re.compile(r"^Vlan(\d+)$")
for oid, value in oid_data.items():
if not oid.startswith(prefix):
continue
m = vlan_re.match(value)
if not m:
continue
vlan_num = m.group(1)
ifindex = oid[len(prefix):]
vlans[vlan_num] = {
"ifIndex": ifindex,
"ifDescr": value,
"ifAlias": _get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"),
"ifAdminStatus": _get(oid_data, f"{OID_IF_ADMIN_STATUS}.{ifindex}"),
"ifOperStatus": _get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"),
}
return vlans
# ────────────────────────────────────────────────────────────────────────
# BDI / BVI correlation
# ────────────────────────────────────────────────────────────────────────
def find_bdi_bvi(oid_data, vlan_id):
"""Find a BDI{vlan_id} or BVI{vlan_id} interface if it exists.
Returns (ifIndex, ifDescr, ifOperStatus, ifAlias) or (None, None, None, None).
"""
prefix = OID_IF_DESCR + "."
targets = [f"BDI{vlan_id}", f"BVI{vlan_id}"]
for oid, value in oid_data.items():
if not oid.startswith(prefix):
continue
if value in targets:
ifindex = oid[len(prefix):]
return (
ifindex,
value,
_get(oid_data, f"{OID_IF_OPER_STATUS}.{ifindex}"),
_get(oid_data, f"{OID_IF_ALIAS}.{ifindex}"),
)
return (None, None, None, None)
# ────────────────────────────────────────────────────────────────────────
# Optics / Entity Sensor (best-effort)
# ────────────────────────────────────────────────────────────────────────
def get_optics_info(oid_data, parent_ifindex, parent_descr):
"""Best-effort extraction of optics data from ENTITY-MIB and
CISCO-ENTITY-SENSOR-MIB.
Strategy:
1. Walk entPhysicalName (.1.3.6.1.2.1.47.1.1.1.1.7) to find entities
whose name contains the interface name/description.
2. For those entity indices, look up sensor values from
CISCO-ENTITY-SENSOR-MIB (.1.3.6.1.4.1.9.9.91.1.1.1.1.4.{entIdx}).
3. Use entPhysicalDescr or entSensorType to classify as Tx/Rx/temp.
Returns dict with txPower, rxPower, temperature (all may be None).
"""
result = {
"txPower": None,
"rxPower": None,
"temperature": None,
}
# entPhysicalName: .1.3.6.1.2.1.47.1.1.1.1.7.{entIdx}
ent_name_prefix = OID_ENT_PHYS_PREFIX.rsplit(".", 1)[0] + ".7."
# Actually: .1.3.6.1.2.1.47.1.1.1.1.7.{idx}
ent_name_prefix = ".1.3.6.1.2.1.47.1.1.1.1.7."
# entPhysicalDescr: .1.3.6.1.2.1.47.1.1.1.1.2.{idx}
ent_descr_prefix = ".1.3.6.1.2.1.47.1.1.1.1.2."
# entSensorValue: .1.3.6.1.4.1.9.9.91.1.1.1.1.4.{idx}
sensor_value_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.4."
# entSensorType: .1.3.6.1.4.1.9.9.91.1.1.1.1.1.{idx}
sensor_type_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.1."
# entSensorPrecision: .1.3.6.1.4.1.9.9.91.1.1.1.1.3.{idx}
# Number of decimal places to apply to entSensorValue
sensor_precision_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.3."
# entSensorScale: .1.3.6.1.4.1.9.9.91.1.1.1.1.2.{idx}
# Scale factor (1=yocto..9=units..17=exa) — 9 means no scaling
sensor_scale_prefix = ".1.3.6.1.4.1.9.9.91.1.1.1.1.2."
# Find entity indices that match the interface
short_name = _shorten_name(parent_descr)
matching_ent_indices = []
for oid, value in oid_data.items():
if not oid.startswith(ent_name_prefix):
continue
# Check if entity name references our interface
if parent_descr in value or short_name in value:
ent_idx = oid[len(ent_name_prefix):]
matching_ent_indices.append(ent_idx)
if not matching_ent_indices:
return result
_dbg(f"Found {len(matching_ent_indices)} entity entries for {parent_descr}")
def _scale_sensor_value(raw_val, ent_idx):
"""Apply entSensorPrecision to scale a raw sensor value."""
try:
val = float(raw_val)
except (ValueError, TypeError):
return raw_val
precision = _get(oid_data, f"{sensor_precision_prefix}{ent_idx}")
try:
prec = int(precision)
except (ValueError, TypeError):
prec = 0
if prec > 0:
val = val / (10 ** prec)
# Round to avoid floating point noise
return str(round(val, prec if prec > 0 else 1))
# For matching entities, look up sensor readings
for ent_idx in matching_ent_indices:
sensor_val = _get(oid_data, f"{sensor_value_prefix}{ent_idx}")
if not sensor_val:
continue
# Determine sensor type from entSensorType or entPhysicalDescr
sensor_type = _get(oid_data, f"{sensor_type_prefix}{ent_idx}")
ent_descr = _get(oid_data, f"{ent_descr_prefix}{ent_idx}").lower()
# Scale the raw value using entSensorPrecision
scaled = _scale_sensor_value(sensor_val, ent_idx)
# entSensorType: 8 = celsius, 14 = dBm
# Also check description text for classification
if sensor_type == "8" or "temperature" in ent_descr or "temp" in ent_descr:
result["temperature"] = scaled
elif "transmit" in ent_descr or "tx" in ent_descr:
result["txPower"] = scaled
elif "receive" in ent_descr or "rx" in ent_descr:
result["rxPower"] = scaled
elif sensor_type == "14":
# dBm but unclassified — assign to first empty power slot
if result["txPower"] is None:
result["txPower"] = scaled
elif result["rxPower"] is None:
result["rxPower"] = scaled
return result
# ────────────────────────────────────────────────────────────────────────
# Debug helper
# ────────────────────────────────────────────────────────────────────────
def _dbg(msg):
"""Print debug/progress info to stderr."""
print(f"[cisco-parse] {msg}", file=sys.stderr)
# ────────────────────────────────────────────────────────────────────────
# Main builder
# ────────────────────────────────────────────────────────────────────────
def build_neighbor_output(oid_data, rem_port_id):
"""Build the structured neighbor output from parsed OID data.
Args:
oid_data: dict from parse_walk_file()
rem_port_id: interface identifier to focus on (e.g., 'Te1/1/3')
Returns:
dict matching the output JSON structure
"""
# System info
sys_info = get_system_info(oid_data)
_dbg(f"System: {sys_info['sysName']} ({sys_info['osType']})")
# Build interface index
interfaces = build_interface_index(oid_data)
_dbg(f"Found {len(interfaces)} interfaces in walk data")
# Match remPortId to ifIndex
matched_ifindex = match_rem_port_id(interfaces, rem_port_id)
# Build queried interface section
queried_iface = {"remPortId": rem_port_id}
parent_descr = ""
if matched_ifindex is not None:
facts = get_interface_facts(oid_data, matched_ifindex)
parent_descr = facts.get("ifDescr", "")
queried_iface.update({
"ifIndex": matched_ifindex,
"ifDescr": facts["ifDescr"],
"ifName": facts["ifName"],
"ifAlias": facts["ifAlias"],
"ifType": facts["ifType"],
"ifAdminStatus": facts["ifAdminStatus"],
"ifOperStatus": facts["ifOperStatus"],
"ifSpeed": facts["ifSpeed"],
"ifHighSpeed": facts["ifHighSpeed"],
"ifMtu": facts["ifMtu"],
"ifHCInOctets": facts["ifHCInOctets"],
"ifHCOutOctets": facts["ifHCOutOctets"],
"ifInErrors": facts["ifInErrors"],
"ifOutErrors": facts["ifOutErrors"],
"ifInDiscards": facts["ifInDiscards"],
"ifOutDiscards": facts["ifOutDiscards"],
})
_dbg(f"Queried interface: {parent_descr} (ifIndex {matched_ifindex})")
else:
# Fill with empty values so the JSON schema is consistent
for key in ("ifIndex", "ifDescr", "ifName", "ifAlias", "ifType",
"ifAdminStatus", "ifOperStatus", "ifSpeed", "ifHighSpeed",
"ifMtu", "ifHCInOctets", "ifHCOutOctets", "ifInErrors",
"ifOutErrors", "ifInDiscards", "ifOutDiscards"):
queried_iface[key] = ""
_dbg("WARNING: No interface matched, output will have empty queried_interface")
# Discover subinterfaces
subinterfaces = {}
if matched_ifindex and parent_descr:
child_indices = discover_subinterfaces(oid_data, matched_ifindex, parent_descr)
_dbg(f"Found {len(child_indices)} subinterfaces")
for child_idx in sorted(child_indices, key=lambda x: int(x) if x.isdigit() else 0):
child_facts = get_interface_facts(oid_data, child_idx)
child_descr = child_facts.get("ifDescr", "")
# Extract VLAN ID from .NNNN suffix
vlan_id = ""
vlan_match = re.search(r"\.(\d+)$", child_descr)
if vlan_match:
vlan_id = vlan_match.group(1)
# BDI/BVI correlation
bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = (None, None, None, None)
if vlan_id:
bvi_ifindex, bvi_descr, bvi_oper, bvi_alias = find_bdi_bvi(oid_data, vlan_id)
subinterfaces[child_idx] = {
"ifDescr": child_descr,
"ifName": child_facts.get("ifName", ""),
"ifAlias": child_facts.get("ifAlias", ""),
"ifAdminStatus": child_facts.get("ifAdminStatus", ""),
"ifOperStatus": child_facts.get("ifOperStatus", ""),
"vlanId": vlan_id,
"bvi_ifIndex": bvi_ifindex,
"bvi_ifDescr": bvi_descr,
"bvi_ifOperStatus": bvi_oper,
"bvi_ifAlias": bvi_alias,
}
# Discover VLANs (IOS-XE)
vlans = discover_vlans(oid_data, sys_info["osType"])
_dbg(f"Found {len(vlans)} VLAN SVIs")
# Optics (best-effort)
optics = {"txPower": None, "rxPower": None, "temperature": None}
if matched_ifindex and parent_descr:
optics = get_optics_info(oid_data, matched_ifindex, parent_descr)
# Assemble output
output = {
"neighbor_system": sys_info,
"queried_interface": queried_iface,
"subinterfaces": subinterfaces,
"vlans": vlans,
"optics": optics,
"_meta": {
"polled_at": datetime.now(timezone.utc).isoformat(),
"target_ip": "",
"walk_lines": 0,
"elapsed_sec": 0.0,
"remPortId": rem_port_id,
},
}
return output
# ────────────────────────────────────────────────────────────────────────
# CLI entry point
# ────────────────────────────────────────────────────────────────────────
def main():
if len(sys.argv) < 3:
print("Usage: cisco-parse.py <walk_file> <remPortId> [output.json]")
print()
print("Arguments:")
print(" walk_file Raw snmpbulkwalk -On -OQ output file")
print(" remPortId Interface to focus on (e.g., Te1/1/3, Gi1/0/39)")
print(" output.json Optional output path (default: {stem}_neighbor_monitoring.json)")
sys.exit(1)
walk_file = Path(sys.argv[1])
rem_port_id = sys.argv[2]
output_path = (
Path(sys.argv[3])
if len(sys.argv) > 3
else walk_file.with_name(walk_file.stem + "_neighbor_monitoring.json")
)
if not walk_file.exists():
print(f"Error: walk file not found: {walk_file}", file=sys.stderr)
sys.exit(1)
_dbg(f"Parsing {walk_file}")
_dbg(f"Looking for remPortId: {rem_port_id}")
t_start = time.time()
oid_data = parse_walk_file(walk_file)
_dbg(f"Parsed {len(oid_data)} OID entries")
result = build_neighbor_output(oid_data, rem_port_id)
result["_meta"]["walk_lines"] = len(oid_data)
result["_meta"]["elapsed_sec"] = round(time.time() - t_start, 2)
# Try to extract target IP from filename pattern: {IP}_{timestamp}_..._walk.txt
# e.g., 172-16-50-4_2026-03-01_10-00-00_neighbor_walk.txt
name = walk_file.stem
ip_match = re.match(r"([\d-]+)_", name)
if ip_match:
result["_meta"]["target_ip"] = ip_match.group(1).replace("-", ".")
output_path.write_text(json.dumps(result, indent=2))
print(f"Wrote {output_path}")
if __name__ == "__main__":
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.9 KiB

View File

@ -30,6 +30,44 @@ from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
WALKS_DIR = SCRIPT_DIR / "walks"
# ── Phase 1: Discovery OIDs (lightweight, column-specific walks) ──────
# On an NCS 5500 with 10k+ interfaces, walking full ifTable/ifXTable would
# return ~150k OIDs. Instead, walk only the columns needed to identify the
# target interface and its children. ~2 OIDs per interface for discovery.
NEIGHBOR_DISCOVERY_OIDS = [
(".1.3.6.1.2.1.1", "System"), # ~8 OIDs
(".1.3.6.1.2.1.2.2.1.2", "ifDescr"), # 1 column: ifDescr
(".1.3.6.1.2.1.31.1.1.1.1", "ifName"), # 1 column: ifName
(".1.3.6.1.2.1.31.1.2", "ifStackTable"), # parent-child relationships
]
# ── Phase 2: Per-interface OID suffixes for targeted snmpget ─────────
# After matching the target ifIndex + children, we GET only these OIDs
# for each relevant interface. ~15 OIDs per interface instead of ~150k total.
NEIGHBOR_INTERFACE_OID_BASES = [
".1.3.6.1.2.1.2.2.1.3", # ifType
".1.3.6.1.2.1.2.2.1.4", # ifMtu
".1.3.6.1.2.1.2.2.1.5", # ifSpeed
".1.3.6.1.2.1.2.2.1.7", # ifAdminStatus
".1.3.6.1.2.1.2.2.1.8", # ifOperStatus
".1.3.6.1.2.1.2.2.1.10", # ifInOctets
".1.3.6.1.2.1.2.2.1.13", # ifInDiscards
".1.3.6.1.2.1.2.2.1.14", # ifInErrors
".1.3.6.1.2.1.2.2.1.16", # ifOutOctets
".1.3.6.1.2.1.2.2.1.19", # ifOutDiscards
".1.3.6.1.2.1.2.2.1.20", # ifOutErrors
".1.3.6.1.2.1.31.1.1.1.6", # ifHCInOctets
".1.3.6.1.2.1.31.1.1.1.10", # ifHCOutOctets
".1.3.6.1.2.1.31.1.1.1.15", # ifHighSpeed
".1.3.6.1.2.1.31.1.1.1.18", # ifAlias
]
# ── Extra subtrees to walk for IOS-XE VLAN/optics (Phase 2 optional) ─
NEIGHBOR_EXTRA_OIDS = [
(".1.3.6.1.2.1.47.1.1.1", "entPhysicalTable"),
(".1.3.6.1.4.1.9.9.91.1.1.1", "ciscoEntitySensor"),
]
# ── OID subtrees for targeted walk (mirrors snmp-walk.sh) ────────────
TARGETED_OIDS = [
(".1.3.6.1.2.1.1", "System"),
@ -41,7 +79,6 @@ TARGETED_OIDS = [
(".1.3.6.1.4.1.22420.1.1", "ACD-DESC-MIB"),
(".1.3.6.1.4.1.22420.2.1", "ACD-ALARM-MIB"),
(".1.3.6.1.4.1.22420.2.2", "ACD-FILTER-MIB"),
(".1.3.6.1.4.1.22420.2.3", "ACD-POLICY-MIB"),
(".1.3.6.1.4.1.22420.2.4", "ACD-SFP-MIB"),
(".1.3.6.1.4.1.22420.2.6", "ACD-REGULATOR-MIB"),
(".1.3.6.1.4.1.22420.2.8", "ACD-SMAP-MIB"),
@ -79,6 +116,14 @@ SNMP_V3_PRIV_PROTO = ENV.get("SNMP_V3_PRIV_PROTO", "AES")
SNMP_V3_PRIV_PASS = ENV.get("SNMP_V3_PRIV_PASS", "")
SNMP_V3_SEC_LEVEL = ENV.get("SNMP_V3_SEC_LEVEL", "authPriv")
# Conditionally include heavy policy MIB (~73% of all OIDs)
if ENV.get("SNMP_WALK_POLICIES", "true").lower() == "true":
TARGETED_OIDS.append((".1.3.6.1.4.1.22420.2.3", "ACD-POLICY-MIB"))
# Neighbor device SNMP credentials (falls back to NID creds)
NEIGHBOR_SNMP_VERSION = ENV.get("NEIGHBOR_SNMP_VERSION", SNMP_VERSION)
NEIGHBOR_SNMP_COMMUNITY = ENV.get("NEIGHBOR_SNMP_COMMUNITY", SNMP_COMMUNITY)
# ── Walk state (shared across threads) ───────────────────────────────
walk_lock = threading.Lock()
@ -95,6 +140,11 @@ walk_version = 0
# Path to latest monitoring JSON (set after successful walk)
latest_json = None
# Neighbor walk state (independent from NID walk)
neighbor_lock = threading.Lock()
neighbor_status = {} # keyed by target IP: {"state": ..., "message": ..., "json_path": ...}
latest_neighbor = {} # keyed by target IP: path to latest neighbor monitoring JSON
def set_status(state, message="", progress=0, **extra):
global walk_version
@ -125,7 +175,7 @@ def build_snmp_auth() -> list:
return ["-v", SNMP_VERSION, "-c", SNMP_COMMUNITY]
def run_walk(target: str, mode: str):
def run_walk(target: str, mode: str, policies: bool = True):
"""Execute the full walk pipeline in a background thread."""
global latest_json
@ -142,6 +192,11 @@ def run_walk(target: str, mode: str):
auth = build_snmp_auth()
t_start = time.time()
# Build OID list for this walk — optionally exclude heavy policy MIB
walk_oids = list(TARGETED_OIDS)
if not policies:
walk_oids = [(oid, lbl) for oid, lbl in walk_oids if lbl != "ACD-POLICY-MIB"]
try:
# ── Step 1: snmpwalk ──────────────────────────────────────
# Use snmpbulkwalk (GETBULK PDUs) when available — much faster
@ -157,7 +212,7 @@ def run_walk(target: str, mode: str):
walk_file.write_text(result.stdout)
else:
# Walk subtrees in parallel for speed
total = len(TARGETED_OIDS)
total = len(walk_oids)
completed = [0] # mutable counter for progress
results_map = {}
@ -175,7 +230,7 @@ def run_walk(target: str, mode: str):
with ThreadPoolExecutor(max_workers=4) as pool:
futures = [
pool.submit(walk_subtree, i, oid, label)
for i, (oid, label) in enumerate(TARGETED_OIDS)
for i, (oid, label) in enumerate(walk_oids)
]
for fut in as_completed(futures):
idx, output = fut.result()
@ -224,6 +279,258 @@ def run_walk(target: str, mode: str):
set_status("error", message=str(e)[:300])
# ── Neighbor device walk ──────────────────────────────────────────────
def build_neighbor_snmp_auth() -> list:
"""Build snmpwalk auth flags for neighbor device (falls back to NID creds)."""
if NEIGHBOR_SNMP_VERSION == "3":
# Future: support v3 for neighbor
return build_snmp_auth()
return ["-v", NEIGHBOR_SNMP_VERSION, "-c", NEIGHBOR_SNMP_COMMUNITY]
def _walk_subtrees_parallel(walk_cmd, auth, target, oid_list, status_prefix=""):
"""Walk a list of (oid, label) subtrees in parallel. Returns combined text."""
total = len(oid_list)
completed = [0]
results_map = {}
def walk_one(idx, oid, label):
try:
res = subprocess.run(
[walk_cmd, "-On", "-OQ"] + auth + [target, oid],
capture_output=True, text=True, timeout=60,
)
completed[0] += 1
with neighbor_lock:
neighbor_status[target] = {
"state": "walking",
"message": f"{status_prefix}({completed[0]}/{total})",
}
return idx, res.stdout
except subprocess.TimeoutExpired:
completed[0] += 1
return idx, ""
with ThreadPoolExecutor(max_workers=4) as pool:
futures = [
pool.submit(walk_one, i, oid, label)
for i, (oid, label) in enumerate(oid_list)
]
for fut in as_completed(futures):
idx, output = fut.result()
if output.strip():
results_map[idx] = output
return "\n".join(results_map[i] for i in sorted(results_map))
def _snmpget_batch(walk_cmd, auth, target, oid_list):
"""Run snmpget for a batch of specific OIDs. Returns raw output text.
Uses snmpget (not bulkwalk) since we're requesting exact OIDs.
Falls back to individual gets if batch fails.
"""
if not oid_list:
return ""
# snmpget can handle multiple OIDs in one call (much faster than individual)
# Split into chunks of 30 to avoid command-line length limits
all_output = []
for i in range(0, len(oid_list), 30):
chunk = oid_list[i:i + 30]
try:
res = subprocess.run(
["snmpget", "-On", "-OQ", "-Oe"] + auth + [target] + chunk,
capture_output=True, text=True, timeout=30,
)
if res.stdout.strip():
all_output.append(res.stdout)
except subprocess.TimeoutExpired:
pass
return "\n".join(all_output)
def run_neighbor_walk(target: str, rem_port_id: str, rem_sys_name: str = ""):
"""Execute a two-phase focused SNMP walk against an LLDP neighbor device.
Phase 1 (Discovery): Walk System + ifDescr + ifName + ifStackTable
Identify target ifIndex and child subinterfaces
Phase 2 (Targeted): snmpget ~15 OIDs per matched interface only
Full interface facts for only the relevant interfaces
On an NCS 5500 with 10k interfaces, this reduces from ~150k OIDs
to ~20k discovery + ~600 targeted GETs.
"""
ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
if not ip_re.match(target):
with neighbor_lock:
neighbor_status[target] = {"state": "error", "message": f"Invalid IP: {target}"}
return
with neighbor_lock:
neighbor_status[target] = {"state": "walking", "message": "Phase 1: Discovering interfaces..."}
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
safe_ip = target.replace(".", "-")
walk_file = WALKS_DIR / f"{safe_ip}_{timestamp}_neighbor_walk.txt"
WALKS_DIR.mkdir(parents=True, exist_ok=True)
auth = build_neighbor_snmp_auth()
walk_cmd = "snmpbulkwalk" if shutil.which("snmpbulkwalk") else "snmpwalk"
t_start = time.time()
try:
# ── Phase 1: Discovery walk ──────────────────────────────────
discovery_output = _walk_subtrees_parallel(
walk_cmd, auth, target, NEIGHBOR_DISCOVERY_OIDS,
status_prefix="Phase 1: Discovery "
)
if not discovery_output.strip():
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": "Discovery walk returned no data — check credentials",
}
return
# Parse discovery data in-process to find target interface
sys.path.insert(0, str(SCRIPT_DIR))
from cisco_parse import parse_walk_text, build_interface_index, match_rem_port_id, \
discover_subinterfaces_stack, discover_subinterfaces_pattern
discovery_oids = parse_walk_text(discovery_output)
interfaces = build_interface_index(discovery_oids)
discovery_count = len(discovery_oids)
with neighbor_lock:
neighbor_status[target] = {
"state": "walking",
"message": f"Phase 1 done: {discovery_count:,} OIDs, {len(interfaces)} interfaces. Matching...",
}
matched_ifindex = match_rem_port_id(interfaces, rem_port_id)
if matched_ifindex is None:
# Write what we have and let cisco-parse produce a best-effort result
walk_file.write_text(discovery_output)
else:
# Find child interfaces (subinterfaces)
parent_descr = interfaces.get(matched_ifindex, {}).get("ifDescr", "")
child_indices = set()
# ifStackTable children
child_indices |= discover_subinterfaces_stack(discovery_oids, matched_ifindex)
# Pattern-based children (ifDescr matching)
if parent_descr:
pattern_children = discover_subinterfaces_pattern(discovery_oids, parent_descr)
child_indices |= set(pattern_children.keys())
# Also look for BDI/BVI interfaces that correlate with subinterfaces
bvi_indices = set()
for child_idx in child_indices:
child_descr = interfaces.get(child_idx, {}).get("ifDescr", "")
vlan_match = re.search(r"\.(\d+)$", child_descr)
if vlan_match:
vlan_id = vlan_match.group(1)
# Search ifDescr for BDI{N} or BVI{N}
for ifidx, info in interfaces.items():
d = info.get("ifDescr", "")
if d == f"BDI{vlan_id}" or d == f"BVI{vlan_id}":
bvi_indices.add(ifidx)
# Also find Vlan{N} SVIs (IOS-XE)
vlan_indices = set()
for ifidx, info in interfaces.items():
d = info.get("ifDescr", "")
if re.match(r"^Vlan\d+$", d):
vlan_indices.add(ifidx)
all_target_indices = {matched_ifindex} | child_indices | bvi_indices | vlan_indices
with neighbor_lock:
neighbor_status[target] = {
"state": "walking",
"message": f"Phase 2: Getting details for {len(all_target_indices)} interfaces...",
}
# ── Phase 2: Targeted snmpget ────────────────────────────
target_oids = []
for ifidx in all_target_indices:
for base_oid in NEIGHBOR_INTERFACE_OID_BASES:
target_oids.append(f"{base_oid}.{ifidx}")
phase2_output = _snmpget_batch(walk_cmd, auth, target, target_oids)
# Also walk optics/entity subtrees (small on most devices)
extra_output = _walk_subtrees_parallel(
walk_cmd, auth, target, NEIGHBOR_EXTRA_OIDS,
status_prefix="Phase 2: Optics/Entity "
)
# Combine all phases into one walk file
combined = discovery_output
if phase2_output.strip():
combined += "\n" + phase2_output
if extra_output.strip():
combined += "\n" + extra_output
walk_file.write_text(combined)
line_count = sum(1 for _ in walk_file.open())
elapsed = round(time.time() - t_start, 1)
if line_count == 0:
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": "Walk returned no data — check credentials",
}
return
# ── Parse combined data with cisco-parse.py ──────────────────
with neighbor_lock:
neighbor_status[target] = {"state": "parsing", "message": "Parsing neighbor data..."}
parse_result = subprocess.run(
[sys.executable, str(SCRIPT_DIR / "cisco_parse.py"),
str(walk_file), rem_port_id],
capture_output=True, text=True, timeout=60,
)
if parse_result.returncode != 0:
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": f"Parse failed: {parse_result.stderr[:200]}",
}
return
neighbor_json = walk_file.with_name(walk_file.stem + "_neighbor_monitoring.json")
if not neighbor_json.is_file():
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": "Parser did not produce neighbor JSON",
}
return
with neighbor_lock:
latest_neighbor[target] = neighbor_json
neighbor_status[target] = {
"state": "complete",
"message": f"Done — {line_count:,} OIDs ({discovery_count:,} discovery + {line_count - discovery_count:,} targeted) in {elapsed}s",
"json_path": str(neighbor_json),
}
except Exception as e:
with neighbor_lock:
neighbor_status[target] = {"state": "error", "message": str(e)[:300]}
# ── Find latest monitoring JSON ──────────────────────────────────────
def find_latest_json() -> Path | None:
@ -261,6 +568,10 @@ class NIDHandler(BaseHTTPRequestHandler):
self._serve_viewer()
elif self.path == "/api/status":
self._serve_sse()
elif self.path.startswith("/api/neighbor-data"):
self._handle_neighbor_data()
elif self.path.startswith("/api/neighbor-status"):
self._handle_neighbor_status()
else:
self._send(404, "text/plain", b"Not found")
@ -277,6 +588,20 @@ class NIDHandler(BaseHTTPRequestHandler):
else:
data = {}
# Merge any available neighbor data into the viewer data
with neighbor_lock:
if latest_neighbor:
nd = {}
for ip, npath in latest_neighbor.items():
if npath and npath.is_file():
try:
with npath.open() as f:
nd[ip] = json.load(f)
except Exception:
pass
if nd:
data["neighbor_data"] = nd
html = build_html(data)
self._send(200, "text/html; charset=utf-8", html.encode())
@ -309,11 +634,41 @@ class NIDHandler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path == "/api/walk":
self._handle_walk()
elif self.path == "/api/ping":
self._handle_ping()
elif self.path == "/api/clear":
self._handle_clear()
elif self.path == "/api/neighbor-walk":
self._handle_neighbor_walk()
else:
self._send(404, "text/plain", b"Not found")
def _handle_ping(self):
"""Ping a target IP to check reachability."""
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
target = body.get("target", "").strip()
if not target:
self._send_json(400, {"error": "target IP required"})
return
ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
if not ip_re.match(target):
self._send_json(400, {"error": f"Invalid IP address: {target}"})
return
try:
result = subprocess.run(
["ping", "-c", "1", "-W", "2", target],
capture_output=True, text=True, timeout=5,
)
reachable = result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
reachable = False
self._send_json(200, {"reachable": reachable, "target": target})
def _handle_walk(self):
"""Start a walk in a background thread."""
current, _ = get_status()
@ -325,6 +680,7 @@ class NIDHandler(BaseHTTPRequestHandler):
body = json.loads(self.rfile.read(length)) if length else {}
target = body.get("target", "").strip()
mode = body.get("mode", "targeted").strip()
policies = body.get("policies", True)
if not target:
self._send_json(400, {"error": "target IP required"})
@ -334,10 +690,79 @@ class NIDHandler(BaseHTTPRequestHandler):
return
set_status("walking", message="Starting walk...", progress=1)
thread = threading.Thread(target=run_walk, args=(target, mode), daemon=True)
thread = threading.Thread(target=run_walk, args=(target, mode, policies), daemon=True)
thread.start()
self._send_json(200, {"status": "started", "target": target, "mode": mode})
def _handle_neighbor_walk(self):
"""Start a neighbor device walk in a background thread."""
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
target = body.get("target", "").strip()
rem_port_id = body.get("remPortId", "").strip()
rem_sys_name = body.get("remSysName", "").strip()
if not target or not rem_port_id:
self._send_json(400, {"error": "target and remPortId required"})
return
ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
if not ip_re.match(target):
self._send_json(400, {"error": f"Invalid IP address: {target}"})
return
# Check if already walking this neighbor
with neighbor_lock:
ns = neighbor_status.get(target, {})
if ns.get("state") == "walking":
self._send_json(409, {"error": f"Already walking {target}"})
return
thread = threading.Thread(
target=run_neighbor_walk,
args=(target, rem_port_id, rem_sys_name),
daemon=True,
)
thread.start()
self._send_json(200, {"status": "started", "target": target})
def _handle_neighbor_data(self):
"""Return the latest neighbor monitoring JSON for a given target IP."""
# Parse ?target=x.x.x.x from query string
from urllib.parse import urlparse, parse_qs
qs = parse_qs(urlparse(self.path).query)
target = qs.get("target", [None])[0]
if not target:
self._send_json(400, {"error": "target query param required"})
return
with neighbor_lock:
json_path = latest_neighbor.get(target)
if not json_path or not json_path.is_file():
self._send_json(404, {"error": f"No neighbor data for {target}"})
return
with json_path.open() as f:
data = json.load(f)
self._send_json(200, data)
def _handle_neighbor_status(self):
"""Return the current walk status for a neighbor target."""
from urllib.parse import urlparse, parse_qs
qs = parse_qs(urlparse(self.path).query)
target = qs.get("target", [None])[0]
if not target:
self._send_json(400, {"error": "target query param required"})
return
with neighbor_lock:
status = neighbor_status.get(target, {"state": "idle", "message": ""})
self._send_json(200, status)
def _handle_clear(self):
"""Move all walk data to walks/archive/ and reset state."""
global latest_json

View File

@ -75,6 +75,8 @@ fi
# ── Define OID subtrees ──────────────────────────────────────────────
# Targeted: only what the viewer/parser needs
WALK_POLICIES="${SNMP_WALK_POLICIES:-true}"
TARGETED_OIDS=(
.1.3.6.1.2.1.1 # System (sysDescr, sysName, sysUpTime, …)
.1.3.6.1.2.1.2 # IF-MIB (interface table)
@ -85,13 +87,16 @@ TARGETED_OIDS=(
.1.3.6.1.4.1.22420.1.1 # ACD-DESC-MIB (device identity, connectors, sensors)
.1.3.6.1.4.1.22420.2.1 # ACD-ALARM-MIB
.1.3.6.1.4.1.22420.2.2 # ACD-FILTER-MIB
.1.3.6.1.4.1.22420.2.3 # ACD-POLICY-MIB (L2 policies, stats)
.1.3.6.1.4.1.22420.2.4 # ACD-SFP-MIB (transceiver info/diag)
.1.3.6.1.4.1.22420.2.6 # ACD-REGULATOR-MIB
.1.3.6.1.4.1.22420.2.8 # ACD-SMAP-MIB (CoS profiles)
.1.3.6.1.4.1.22420.2.9 # ACD-PORT-MIB (port config/status)
)
if [[ "$WALK_POLICIES" == "true" ]]; then
TARGETED_OIDS+=(.1.3.6.1.4.1.22420.2.3) # ACD-POLICY-MIB (~73% of all OIDs)
fi
# ── Prepare output paths ─────────────────────────────────────────────
TIMESTAMP="$(date +%Y-%m-%d_%H-%M-%S)"
SAFE_IP="${TARGET//./-}"

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff