nid-snmp/snmp-parse.py
sam dfdbd85bf7 Initial commit: SNMP NID Viewer toolkit
SNMP walk parser (snmp-parse.py) with MIB resolution, structured LLDP
neighbor extraction, IP address parsing, and comprehensive table
reconstruction for Accedian AMN-1000-GT-S NIDs.

HTML viewer generator (build_nid_viewer.py) with dark-themed dashboard
including LLDP topology diagram, SFP optics, traffic stats, alarms,
port config, coverage matrix, and policy/filter/regulator sections.

Includes 15 Accedian MIB files and sample walk data from 10.13.60.102.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 10:11:23 -07:00

1418 lines
59 KiB
Python

#!/usr/bin/env python3
"""
SNMP Walk Parser with Full MIB-Aware OID Resolution
Parses an snmpwalk output file (net-snmp -On -OQ format) from an
Accedian GT NID and resolves numeric OIDs to human-readable names
using downloaded Accedian MIB files + built-in standard MIB mappings.
Outputs:
- {base}_resolved.json : Full structured output grouped by MIB module
- {base}_monitoring.json : Monitoring-focused subset (SFP optics, alarms, ports, system)
- {base}_tables.csv : Reconstructed SNMP tables as flat CSV with named columns
Usage:
python3 snmp-parse.py [walk_file]
If no walk_file is given, defaults to the walk in this directory.
"""
import json
import re
import sys
import csv
from pathlib import Path
from collections import defaultdict, OrderedDict
# ────────────────────────────────────────────────────────────────────────
# CONFIG
# ────────────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
WALKS_DIR = SCRIPT_DIR / "walks"
DEFAULT_WALK = WALKS_DIR / "10-13-60-102_2026-02-27_11-23-07_walk.txt"
MIB_DIR = SCRIPT_DIR / "mibs" / "accedian"
# ────────────────────────────────────────────────────────────────────────
# STANDARD MIB NAME MAP (RFC 1213, IF-MIB, SNMPv2-MIB, etc.)
# Covers the ~18% of OIDs under .1.3.6.1.2.1 and .1.3.6.1.6
# ────────────────────────────────────────────────────────────────────────
STANDARD_OID_MAP = {
# ── SNMPv2-MIB / RFC 1213: System group ──
".1.3.6.1.2.1.1.1": "sysDescr",
".1.3.6.1.2.1.1.2": "sysObjectID",
".1.3.6.1.2.1.1.3": "sysUpTime",
".1.3.6.1.2.1.1.4": "sysContact",
".1.3.6.1.2.1.1.5": "sysName",
".1.3.6.1.2.1.1.6": "sysLocation",
".1.3.6.1.2.1.1.7": "sysServices",
".1.3.6.1.2.1.1.8": "sysORLastChange",
".1.3.6.1.2.1.1.9": "sysORTable",
".1.3.6.1.2.1.1.9.1.1": "sysORIndex",
".1.3.6.1.2.1.1.9.1.2": "sysORID",
".1.3.6.1.2.1.1.9.1.3": "sysORDescr",
".1.3.6.1.2.1.1.9.1.4": "sysORUpTime",
# ── IF-MIB: Interfaces group ──
".1.3.6.1.2.1.2.1": "ifNumber",
".1.3.6.1.2.1.2.2": "ifTable",
".1.3.6.1.2.1.2.2.1.1": "ifIndex",
".1.3.6.1.2.1.2.2.1.2": "ifDescr",
".1.3.6.1.2.1.2.2.1.3": "ifType",
".1.3.6.1.2.1.2.2.1.4": "ifMtu",
".1.3.6.1.2.1.2.2.1.5": "ifSpeed",
".1.3.6.1.2.1.2.2.1.6": "ifPhysAddress",
".1.3.6.1.2.1.2.2.1.7": "ifAdminStatus",
".1.3.6.1.2.1.2.2.1.8": "ifOperStatus",
".1.3.6.1.2.1.2.2.1.9": "ifLastChange",
".1.3.6.1.2.1.2.2.1.10": "ifInOctets",
".1.3.6.1.2.1.2.2.1.11": "ifInUcastPkts",
".1.3.6.1.2.1.2.2.1.12": "ifInNUcastPkts",
".1.3.6.1.2.1.2.2.1.13": "ifInDiscards",
".1.3.6.1.2.1.2.2.1.14": "ifInErrors",
".1.3.6.1.2.1.2.2.1.15": "ifInUnknownProtos",
".1.3.6.1.2.1.2.2.1.16": "ifOutOctets",
".1.3.6.1.2.1.2.2.1.17": "ifOutUcastPkts",
".1.3.6.1.2.1.2.2.1.18": "ifOutNUcastPkts",
".1.3.6.1.2.1.2.2.1.19": "ifOutDiscards",
".1.3.6.1.2.1.2.2.1.20": "ifOutErrors",
".1.3.6.1.2.1.2.2.1.21": "ifOutQLen",
".1.3.6.1.2.1.2.2.1.22": "ifSpecific",
# ── IF-MIB: ifXTable (extended interface stats) ──
".1.3.6.1.2.1.31.1.1.1.1": "ifName",
".1.3.6.1.2.1.31.1.1.1.2": "ifInMulticastPkts",
".1.3.6.1.2.1.31.1.1.1.3": "ifInBroadcastPkts",
".1.3.6.1.2.1.31.1.1.1.4": "ifOutMulticastPkts",
".1.3.6.1.2.1.31.1.1.1.5": "ifOutBroadcastPkts",
".1.3.6.1.2.1.31.1.1.1.6": "ifHCInOctets",
".1.3.6.1.2.1.31.1.1.1.7": "ifHCInUcastPkts",
".1.3.6.1.2.1.31.1.1.1.8": "ifHCInMulticastPkts",
".1.3.6.1.2.1.31.1.1.1.9": "ifHCInBroadcastPkts",
".1.3.6.1.2.1.31.1.1.1.10": "ifHCOutOctets",
".1.3.6.1.2.1.31.1.1.1.11": "ifHCOutUcastPkts",
".1.3.6.1.2.1.31.1.1.1.12": "ifHCOutMulticastPkts",
".1.3.6.1.2.1.31.1.1.1.13": "ifHCOutBroadcastPkts",
".1.3.6.1.2.1.31.1.1.1.14": "ifLinkUpDownTrapEnable",
".1.3.6.1.2.1.31.1.1.1.15": "ifHighSpeed",
".1.3.6.1.2.1.31.1.1.1.16": "ifPromiscuousMode",
".1.3.6.1.2.1.31.1.1.1.17": "ifConnectorPresent",
".1.3.6.1.2.1.31.1.1.1.18": "ifAlias",
".1.3.6.1.2.1.31.1.1.1.19": "ifCounterDiscontinuityTime",
".1.3.6.1.2.1.31.1.5": "ifTableLastChange",
# ── IP-MIB ──
".1.3.6.1.2.1.4.1": "ipForwarding",
".1.3.6.1.2.1.4.2": "ipDefaultTTL",
".1.3.6.1.2.1.4.3": "ipInReceives",
".1.3.6.1.2.1.4.4": "ipInHdrErrors",
".1.3.6.1.2.1.4.5": "ipInAddrErrors",
".1.3.6.1.2.1.4.6": "ipForwDatagrams",
".1.3.6.1.2.1.4.7": "ipInUnknownProtos",
".1.3.6.1.2.1.4.8": "ipInDiscards",
".1.3.6.1.2.1.4.9": "ipInDelivers",
".1.3.6.1.2.1.4.10": "ipOutRequests",
".1.3.6.1.2.1.4.11": "ipOutDiscards",
".1.3.6.1.2.1.4.12": "ipOutNoRoutes",
".1.3.6.1.2.1.4.13": "ipReasmTimeout",
".1.3.6.1.2.1.4.20": "ipAddrTable",
".1.3.6.1.2.1.4.20.1.1": "ipAdEntAddr",
".1.3.6.1.2.1.4.20.1.2": "ipAdEntIfIndex",
".1.3.6.1.2.1.4.20.1.3": "ipAdEntNetMask",
".1.3.6.1.2.1.4.20.1.4": "ipAdEntBcastAddr",
".1.3.6.1.2.1.4.20.1.5": "ipAdEntReasmMaxSize",
".1.3.6.1.2.1.4.21": "ipRouteTable",
".1.3.6.1.2.1.4.22": "ipNetToMediaTable",
".1.3.6.1.2.1.4.24": "ipForward",
".1.3.6.1.2.1.4.31": "ipTrafficStats",
".1.3.6.1.2.1.4.32": "ipAddressPrefixTable",
".1.3.6.1.2.1.4.34": "ipAddressTable",
".1.3.6.1.2.1.4.34.1.3": "ipAddressIfIndex",
".1.3.6.1.2.1.4.34.1.4": "ipAddressType",
".1.3.6.1.2.1.4.34.1.5": "ipAddressPrefix",
".1.3.6.1.2.1.4.34.1.6": "ipAddressOrigin",
".1.3.6.1.2.1.4.34.1.7": "ipAddressStatus",
".1.3.6.1.2.1.4.34.1.8": "ipAddressCreated",
".1.3.6.1.2.1.4.34.1.9": "ipAddressLastChanged",
".1.3.6.1.2.1.4.34.1.10": "ipAddressRowStatus",
".1.3.6.1.2.1.4.34.1.11": "ipAddressStorageType",
".1.3.6.1.2.1.4.35": "ipNetToPhysicalTable",
".1.3.6.1.2.1.4.36": "ipv6ScopeZoneIndexTable",
# ── TCP-MIB ──
".1.3.6.1.2.1.6.1": "tcpRtoAlgorithm",
".1.3.6.1.2.1.6.2": "tcpRtoMin",
".1.3.6.1.2.1.6.3": "tcpRtoMax",
".1.3.6.1.2.1.6.4": "tcpMaxConn",
".1.3.6.1.2.1.6.5": "tcpActiveOpens",
".1.3.6.1.2.1.6.6": "tcpPassiveOpens",
".1.3.6.1.2.1.6.7": "tcpAttemptFails",
".1.3.6.1.2.1.6.8": "tcpEstabResets",
".1.3.6.1.2.1.6.9": "tcpCurrEstab",
".1.3.6.1.2.1.6.10": "tcpInSegs",
".1.3.6.1.2.1.6.11": "tcpOutSegs",
".1.3.6.1.2.1.6.12": "tcpRetransSegs",
".1.3.6.1.2.1.6.13": "tcpConnTable",
".1.3.6.1.2.1.6.14": "tcpInErrs",
".1.3.6.1.2.1.6.15": "tcpOutRsts",
".1.3.6.1.2.1.6.19": "tcpConnectionTable",
".1.3.6.1.2.1.6.20": "tcpListenerTable",
".1.3.6.1.2.1.49": "tcpMIB",
# ── UDP-MIB ──
".1.3.6.1.2.1.7.1": "udpInDatagrams",
".1.3.6.1.2.1.7.2": "udpNoPorts",
".1.3.6.1.2.1.7.3": "udpInErrors",
".1.3.6.1.2.1.7.4": "udpOutDatagrams",
".1.3.6.1.2.1.7.7": "udpEndpointTable",
".1.3.6.1.2.1.50": "udpMIB",
# ── SNMP group ──
".1.3.6.1.2.1.11.1": "snmpInPkts",
".1.3.6.1.2.1.11.2": "snmpOutPkts",
".1.3.6.1.2.1.11.3": "snmpInBadVersions",
".1.3.6.1.2.1.11.4": "snmpInBadCommunityNames",
".1.3.6.1.2.1.11.5": "snmpInBadCommunityUses",
".1.3.6.1.2.1.11.6": "snmpInASNParseErrs",
".1.3.6.1.2.1.11.8": "snmpInTooBigs",
".1.3.6.1.2.1.11.9": "snmpInNoSuchNames",
".1.3.6.1.2.1.11.10": "snmpInBadValues",
".1.3.6.1.2.1.11.11": "snmpInReadOnlys",
".1.3.6.1.2.1.11.12": "snmpInGenErrs",
".1.3.6.1.2.1.11.13": "snmpInTotalReqVars",
".1.3.6.1.2.1.11.14": "snmpInTotalSetVars",
".1.3.6.1.2.1.11.15": "snmpInGetRequests",
".1.3.6.1.2.1.11.16": "snmpInGetNexts",
".1.3.6.1.2.1.11.17": "snmpInSetRequests",
".1.3.6.1.2.1.11.18": "snmpInGetResponses",
".1.3.6.1.2.1.11.19": "snmpInTraps",
".1.3.6.1.2.1.11.20": "snmpOutTooBigs",
".1.3.6.1.2.1.11.21": "snmpOutNoSuchNames",
".1.3.6.1.2.1.11.22": "snmpOutBadValues",
".1.3.6.1.2.1.11.24": "snmpOutGenErrs",
".1.3.6.1.2.1.11.25": "snmpOutGetRequests",
".1.3.6.1.2.1.11.26": "snmpOutGetNexts",
".1.3.6.1.2.1.11.28": "snmpOutGetResponses",
".1.3.6.1.2.1.11.29": "snmpOutTraps",
".1.3.6.1.2.1.11.30": "snmpEnableAuthenTraps",
".1.3.6.1.2.1.11.31": "snmpSilentDrops",
".1.3.6.1.2.1.11.32": "snmpProxyDrops",
# ── ENTITY-MIB (.1.3.6.1.2.1.47) ──
".1.3.6.1.2.1.47.1.1.1.1.2": "entPhysicalDescr",
".1.3.6.1.2.1.47.1.1.1.1.3": "entPhysicalVendorType",
".1.3.6.1.2.1.47.1.1.1.1.4": "entPhysicalContainedIn",
".1.3.6.1.2.1.47.1.1.1.1.5": "entPhysicalClass",
".1.3.6.1.2.1.47.1.1.1.1.7": "entPhysicalName",
# ── IPv6-MIB (.1.3.6.1.2.1.55) ──
".1.3.6.1.2.1.55.1.1": "ipv6Forwarding",
".1.3.6.1.2.1.55.1.2": "ipv6DefaultHopLimit",
".1.3.6.1.2.1.55.1.3": "ipv6Interfaces",
".1.3.6.1.2.1.55.1.5": "ipv6IfTable",
".1.3.6.1.2.1.55.1.5.1.2": "ipv6IfDescr",
".1.3.6.1.2.1.55.1.5.1.3": "ipv6IfLowerLayer",
".1.3.6.1.2.1.55.1.5.1.4": "ipv6IfEffectiveMtu",
".1.3.6.1.2.1.55.1.5.1.8": "ipv6IfPhysicalAddress",
".1.3.6.1.2.1.55.1.5.1.9": "ipv6IfAdminStatus",
".1.3.6.1.2.1.55.1.5.1.10": "ipv6IfOperStatus",
# ── LAG-MIB (.1.3.6.1.2.1.32) ── (partial, IEEE 802.3ad)
".1.3.6.1.2.1.32": "dot3adAgg",
# ── Notification log (.1.3.6.1.2.1.92) ──
".1.3.6.1.2.1.92": "notificationLogMIB",
# ── IEEE 802.1 Bridge / LLDP ──
".1.3.111.2.802.1.1.8": "ieee8021BridgeMIB",
".1.3.111.2.802.1.1.13": "lldpMIB",
# ── SNMPv2 framework ──
".1.3.6.1.6.3.1": "snmpFrameworkMIB",
".1.3.6.1.6.3.13.3.1.3": "snmpNotifyFilterMIB",
".1.3.6.1.6.3.16.2.2.1": "vacmAccessTable",
}
# Module-level OID prefix labels for grouping
STD_MODULE_MAP = OrderedDict([
(".1.3.6.1.2.1.1", "SNMPv2-MIB::system"),
(".1.3.6.1.2.1.2", "IF-MIB::interfaces"),
(".1.3.6.1.2.1.4", "IP-MIB"),
(".1.3.6.1.2.1.6", "TCP-MIB"),
(".1.3.6.1.2.1.7", "UDP-MIB"),
(".1.3.6.1.2.1.11", "SNMPv2-MIB::snmp"),
(".1.3.6.1.2.1.31", "IF-MIB::ifXTable"),
(".1.3.6.1.2.1.47", "ENTITY-MIB"),
(".1.3.6.1.2.1.49", "TCP-MIB"),
(".1.3.6.1.2.1.50", "UDP-MIB"),
(".1.3.6.1.2.1.55", "IPv6-MIB"),
(".1.3.6.1.2.1.92", "NOTIFICATION-LOG-MIB"),
(".1.3.111.2.802.1", "IEEE-802.1"),
(".1.3.6.1.6.3", "SNMPv3-FRAMEWORK"),
])
# ────────────────────────────────────────────────────────────────────────
# MIB FILE PARSER — extracts OBJECT-TYPE definitions from .mib files
# ────────────────────────────────────────────────────────────────────────
def parse_mib_files(mib_dir: Path) -> dict:
"""
Parse all MIB files in mib_dir and return a mapping of
numeric_oid (str) -> {name, description, syntax, max_access, parent_table}
"""
if not mib_dir.is_dir():
print(f" Warning: MIB directory not found: {mib_dir}", file=sys.stderr)
return {}
# Step 1: Extract all name -> (parent_name, number) assignments
name_to_parent = {} # name -> (parent_name, sub_id)
name_to_info = {} # name -> {description, syntax, max_access, ...}
# Known roots from ACCEDIAN-SMI
name_to_parent["enterprises"] = ("_root", None)
name_to_parent["accedianMIB"] = ("enterprises", 22420)
name_to_parent["acdProducts"] = ("accedianMIB", 1)
name_to_parent["acdMibs"] = ("accedianMIB", 2)
name_to_parent["acdTraps"] = ("accedianMIB", 3)
name_to_parent["acdExperiment"] = ("accedianMIB", 4)
name_to_parent["acdServices"] = ("accedianMIB", 5)
# Well-known standard roots
name_to_parent["sysName"] = ("_std", None)
mib_files = sorted(mib_dir.glob("*"))
for mib_file in mib_files:
if mib_file.name == "ACCEDIAN-SMI":
continue # Already handled above
text = mib_file.read_text(encoding="utf-8", errors="replace")
_parse_single_mib(text, name_to_parent, name_to_info)
# Step 2: Resolve each name to its full numeric OID
oid_map = {}
resolved_cache = {}
def resolve(name):
if name in resolved_cache:
return resolved_cache[name]
if name == "_root" or name == "_std":
return None
if name not in name_to_parent:
return None
parent_name, sub_id = name_to_parent[name]
if sub_id is None:
return None
parent_oid = resolve(parent_name)
if parent_oid is None:
# Parent is enterprises
if parent_name == "enterprises":
result = f".1.3.6.1.4.1.{sub_id}"
else:
return None
else:
result = f"{parent_oid}.{sub_id}"
resolved_cache[name] = result
return result
for name in name_to_parent:
oid = resolve(name)
if oid:
info = name_to_info.get(name, {})
info["name"] = name
oid_map[oid] = info
return oid_map
def _parse_single_mib(text: str, name_to_parent: dict, name_to_info: dict):
"""Extract OBJECT-TYPE, MODULE-IDENTITY, and OBJECT IDENTIFIER assignments."""
# Remove single-line comments (-- to end of line) but NOT inside quoted strings.
# MIB comments start with -- and go to end of line.
# Be careful not to strip inside DESCRIPTION "..." blocks.
# Simple approach: strip comments outside of quotes.
lines = text.split('\n')
cleaned_lines = []
in_quotes = False
for line in lines:
cleaned = []
i = 0
while i < len(line):
if line[i] == '"':
in_quotes = not in_quotes
cleaned.append(line[i])
elif not in_quotes and line[i:i+2] == '--':
break # Rest of line is comment
else:
cleaned.append(line[i])
i += 1
cleaned_lines.append(''.join(cleaned))
text = '\n'.join(cleaned_lines)
# Strategy: find ALL "::= { parentName number }" assignments and
# look backwards to find the object name.
# 1. Simple OBJECT IDENTIFIER assignments: name OBJECT IDENTIFIER ::= { parent num }
simple_oid_pattern = re.compile(
r'(\w+)\s+OBJECT\s+IDENTIFIER\s*::=\s*\{\s*(\w+)\s+(\d+)\s*\}'
)
for m in simple_oid_pattern.finditer(text):
name, parent, num = m.group(1), m.group(2), int(m.group(3))
name_to_parent[name] = (parent, num)
# 2. MODULE-IDENTITY assignments
mod_pattern = re.compile(
r'(\w+)\s+MODULE-IDENTITY\s.*?::=\s*\{\s*(\w+)\s+(\d+)\s*\}',
re.DOTALL
)
for m in mod_pattern.finditer(text):
name, parent, num = m.group(1), m.group(2), int(m.group(3))
name_to_parent[name] = (parent, num)
# 3. OBJECT-TYPE / OBJECT-IDENTITY / NOTIFICATION-TYPE assignments
# Use a targeted approach: find all ::= { parent num } and look back for the name
assign_pattern = re.compile(r'::=\s*\{\s*(\w+)\s+(\d+)\s*\}')
for m in assign_pattern.finditer(text):
parent = m.group(1)
num = int(m.group(2))
# Look backwards from match start to find the object name
before = text[:m.start()].rstrip()
# Find the last OBJECT-TYPE, OBJECT-IDENTITY, or similar keyword
# then the name is the word before that keyword
name_match = re.search(
r'(\w+)\s+(?:OBJECT-TYPE|OBJECT-IDENTITY|MODULE-IDENTITY|NOTIFICATION-TYPE)'
r'\s*$',
before, re.DOTALL
)
if name_match:
name = name_match.group(1)
name_to_parent[name] = (parent, num)
else:
# Could be a SEQUENCE member or OBJECT IDENTIFIER (already caught above)
# Try: name OBJECT IDENTIFIER ::= { parent num }
oid_match = re.search(r'(\w+)\s+OBJECT\s+IDENTIFIER\s*$', before)
if oid_match:
name = oid_match.group(1)
name_to_parent[name] = (parent, num)
# 4. Extract detailed info (SYNTAX, DESCRIPTION, MAX-ACCESS) for OBJECT-TYPE
obj_pattern = re.compile(
r'(\w+)\s+OBJECT-TYPE\s+'
r'SYNTAX\s+(.*?)\s+'
r'(?:UNITS\s+"(.*?)"\s+)?'
r'MAX-ACCESS\s+(\S+)\s+'
r'STATUS\s+\S+\s+'
r'DESCRIPTION\s+"(.*?)"',
re.DOTALL
)
for m in obj_pattern.finditer(text):
name = m.group(1)
syntax_raw = m.group(2).strip()
units = m.group(3)
max_access = m.group(4)
desc = m.group(5).strip()
# Clean up syntax (take first word)
syntax = syntax_raw.split()[0] if syntax_raw else ""
# Clean description (collapse whitespace)
desc = re.sub(r'\s+', ' ', desc).strip()
name_to_info[name] = {
"syntax": syntax,
"units": units,
"max_access": max_access,
"description": desc[:200], # Truncate long descriptions
}
# ────────────────────────────────────────────────────────────────────────
# WALK FILE PARSER
# ────────────────────────────────────────────────────────────────────────
def parse_walk_file(walk_file: Path) -> dict:
"""
Parse entire walk file into {oid: value} dict.
Handles multi-line quoted values (e.g., "-inf dBm\\n").
"""
data = OrderedDict()
skipped = 0
pending_oid = None
pending_value = None
oid_re = re.compile(r'^(\.\d+(?:\.\d+)*)\s+=\s+(.*)$')
with walk_file.open(encoding="utf-8", errors="replace") as f:
for line in f:
raw = line.rstrip('\n\r')
stripped = raw.strip()
if not stripped or "No more variables left" in stripped:
skipped += 1
continue
# If we're accumulating a multi-line quoted value
if pending_oid is not None:
pending_value += " " + stripped
# Check if the closing quote arrived
if stripped.endswith('"'):
# Strip surrounding quotes
val = pending_value.strip()
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
data[pending_oid] = val.strip()
pending_oid = None
pending_value = None
continue
match = oid_re.match(stripped)
if not match:
skipped += 1
continue
oid, value = match.groups()
value = value.strip()
# Check for multi-line quoted value (opening " but no closing ")
if value.startswith('"') and not value.endswith('"'):
pending_oid = oid
pending_value = value
continue
# Normal single-line value
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
data[oid] = value.strip()
# Flush any pending value
if pending_oid is not None:
val = pending_value.strip().strip('"')
data[pending_oid] = val.strip()
return data, skipped
# ────────────────────────────────────────────────────────────────────────
# OID RESOLVER — matches numeric OIDs to names
# ────────────────────────────────────────────────────────────────────────
class OIDResolver:
def __init__(self, accedian_map: dict, standard_map: dict):
# Merge both maps: numeric_oid -> info dict
self.oid_map = {}
for oid, name in standard_map.items():
self.oid_map[oid] = {"name": name}
for oid, info in accedian_map.items():
self.oid_map[oid] = info
# Build sorted list of known OIDs for longest-prefix matching
self._sorted_oids = sorted(self.oid_map.keys(),
key=lambda x: len(x), reverse=True)
def resolve(self, oid: str):
"""
Resolve a numeric OID to (column_name, index_suffix, info_dict).
Uses longest-prefix matching: finds the longest known OID prefix,
then treats the remainder as the table index.
"""
# Exact match first
if oid in self.oid_map:
return self.oid_map[oid]["name"], "", self.oid_map[oid]
# Longest prefix match
for known_oid in self._sorted_oids:
if oid.startswith(known_oid + "."):
suffix = oid[len(known_oid) + 1:]
return self.oid_map[known_oid]["name"], suffix, self.oid_map[known_oid]
return None, None, None
def get_module(self, oid: str) -> str:
"""Determine which MIB module an OID belongs to."""
# Check Accedian modules first (sorted longest-prefix-first)
accedian_modules = {
".1.3.6.1.4.1.22420.1.1": "ACD-DESC-MIB",
".1.3.6.1.4.1.22420.2.1": "ACD-ALARM-MIB",
".1.3.6.1.4.1.22420.2.2": "ACD-FILTER-MIB",
".1.3.6.1.4.1.22420.2.3": "ACD-POLICY-MIB",
".1.3.6.1.4.1.22420.2.4": "ACD-SFP-MIB",
".1.3.6.1.4.1.22420.2.5": "ACD-PAA-MIB",
".1.3.6.1.4.1.22420.2.6": "ACD-REGULATOR-MIB",
".1.3.6.1.4.1.22420.2.7": "ACD-CFM-MIB",
".1.3.6.1.4.1.22420.2.8": "ACD-SMAP-MIB",
".1.3.6.1.4.1.22420.2.9": "ACD-PORT-MIB",
".1.3.6.1.4.1.22420.2.10": "ACD-SHAPER-MIB",
".1.3.6.1.4.1.22420.2.11": "ACD-DISCOVERY-MIB",
".1.3.6.1.4.1.22420.2.12": "ACD-SA-MIB",
".1.3.6.1.4.1.22420.2.14": "ACD-TID-MIB",
}
for prefix, module in sorted(accedian_modules.items(),
key=lambda x: len(x[0]), reverse=True):
if oid == prefix or oid.startswith(prefix + "."):
return module
# Check standard modules (sorted longest-prefix-first to avoid
# .1.3.6.1.2.1.1 matching .1.3.6.1.2.1.11)
for prefix, module in sorted(STD_MODULE_MAP.items(),
key=lambda x: len(x[0]), reverse=True):
if oid == prefix or oid.startswith(prefix + "."):
return module
if oid.startswith(".1.3.6.1.4.1.22420"):
return "ACCEDIAN-ENTERPRISE"
if oid.startswith(".1.3.6.1.2.1"):
return "STD-MIB"
return "UNKNOWN"
# ────────────────────────────────────────────────────────────────────────
# TABLE RECONSTRUCTOR — groups OIDs into SNMP tables
# ────────────────────────────────────────────────────────────────────────
def reconstruct_tables(walk_data: dict, resolver: OIDResolver) -> dict:
"""
Group walk data into SNMP tables.
Returns: {table_key: {index: {column_name: value}}}
"""
tables = defaultdict(lambda: defaultdict(dict))
scalars = OrderedDict()
unresolved = OrderedDict()
for oid, value in walk_data.items():
col_name, index, info = resolver.resolve(oid)
if col_name is None:
# Unresolved — store with module grouping
module = resolver.get_module(oid)
unresolved[oid] = {"module": module, "value": value}
continue
if not index:
# Scalar value (no index suffix) — e.g., sysDescr.0
scalars[col_name] = value
else:
# Table row — group by column name prefix
tables[col_name][index] = value
return dict(tables), scalars, dict(unresolved)
# ────────────────────────────────────────────────────────────────────────
# OUTPUT BUILDERS
# ────────────────────────────────────────────────────────────────────────
def build_resolved_output(walk_data: dict, resolver: OIDResolver) -> dict:
"""Build the full resolved JSON output grouped by MIB module."""
modules = defaultdict(lambda: {"scalars": {}, "tables": defaultdict(dict)})
for oid, value in walk_data.items():
module = resolver.get_module(oid)
col_name, index, info = resolver.resolve(oid)
if col_name is None:
# Unresolved — store raw
if "unresolved" not in modules[module]:
modules[module]["unresolved"] = {}
modules[module]["unresolved"][oid] = value
continue
if not index:
# Scalar — e.g., .0 suffix was already stripped by walk
# Check if there's a .0 variant
modules[module]["scalars"][col_name] = value
else:
# Table entry
table_key = col_name
modules[module]["tables"][table_key][index] = value
# Convert to regular dicts for JSON serialization
result = OrderedDict()
for mod_name in sorted(modules.keys()):
mod_data = modules[mod_name]
entry = OrderedDict()
if mod_data["scalars"]:
entry["scalars"] = dict(mod_data["scalars"])
if mod_data["tables"]:
entry["tables"] = {}
for tbl_name, rows in sorted(mod_data["tables"].items()):
entry["tables"][tbl_name] = dict(rows)
if "unresolved" in mod_data and mod_data["unresolved"]:
entry["unresolved_oids"] = mod_data["unresolved"]
result[mod_name] = entry
return result
def build_monitoring_output(walk_data: dict, resolver: OIDResolver) -> dict:
"""
Build a monitoring-focused JSON with the most useful data for
device monitoring and config status.
"""
output = OrderedDict()
# ── 1. Device Identity ──
device = OrderedDict()
identity_oids = {
".1.3.6.1.2.1.1.1.0": "sysDescr",
".1.3.6.1.2.1.1.3.0": "sysUpTime",
".1.3.6.1.2.1.1.4.0": "sysContact",
".1.3.6.1.2.1.1.5.0": "sysName",
".1.3.6.1.2.1.1.6.0": "sysLocation",
".1.3.6.1.4.1.22420.1.1.1.0": "commercialName",
".1.3.6.1.4.1.22420.1.1.2.0": "macBaseAddr",
".1.3.6.1.4.1.22420.1.1.3.0": "identifier",
".1.3.6.1.4.1.22420.1.1.4.0": "firmwareVersion",
".1.3.6.1.4.1.22420.1.1.5.0": "hardwareVersion",
".1.3.6.1.4.1.22420.1.1.6.0": "serialNumber",
".1.3.6.1.4.1.22420.1.1.7.0": "hardwareOptions",
".1.3.6.1.4.1.22420.1.1.20.0": "cpuUsageCurrent",
".1.3.6.1.4.1.22420.1.1.21.0": "cpuUsageAvg15s",
".1.3.6.1.4.1.22420.1.1.22.0": "cpuUsageAvg30s",
".1.3.6.1.4.1.22420.1.1.23.0": "cpuUsageAvg60s",
".1.3.6.1.4.1.22420.1.1.24.0": "cpuUsageAvg900s",
".1.3.6.1.4.1.22420.1.1.25.0": "uptimeSeconds",
}
for oid, label in identity_oids.items():
if oid in walk_data:
device[label] = walk_data[oid]
output["device"] = device
# ── 2. Interfaces ──
interfaces = OrderedDict()
if_columns = {
".1.3.6.1.2.1.2.2.1.2": "ifDescr",
".1.3.6.1.2.1.2.2.1.3": "ifType",
".1.3.6.1.2.1.2.2.1.4": "ifMtu",
".1.3.6.1.2.1.2.2.1.5": "ifSpeed",
".1.3.6.1.2.1.2.2.1.6": "ifPhysAddress",
".1.3.6.1.2.1.2.2.1.7": "ifAdminStatus",
".1.3.6.1.2.1.2.2.1.8": "ifOperStatus",
".1.3.6.1.2.1.2.2.1.10": "ifInOctets",
".1.3.6.1.2.1.2.2.1.13": "ifInDiscards",
".1.3.6.1.2.1.2.2.1.14": "ifInErrors",
".1.3.6.1.2.1.2.2.1.16": "ifOutOctets",
".1.3.6.1.2.1.2.2.1.19": "ifOutDiscards",
".1.3.6.1.2.1.2.2.1.20": "ifOutErrors",
".1.3.6.1.2.1.31.1.1.1.1": "ifName",
".1.3.6.1.2.1.31.1.1.1.6": "ifHCInOctets",
".1.3.6.1.2.1.31.1.1.1.10": "ifHCOutOctets",
".1.3.6.1.2.1.31.1.1.1.15": "ifHighSpeed",
".1.3.6.1.2.1.31.1.1.1.18": "ifAlias",
}
for prefix, col_name in if_columns.items():
for oid, value in walk_data.items():
if oid.startswith(prefix + "."):
idx = oid[len(prefix) + 1:]
if idx not in interfaces:
interfaces[idx] = OrderedDict()
interfaces[idx][col_name] = value
output["interfaces"] = interfaces
# ── 3. Connectors (ACD-DESC-MIB) ──
connectors = _extract_table(walk_data, ".1.3.6.1.4.1.22420.1.1.10.1", {
"1": "id", "2": "name", "3": "type", "4": "poeSupport",
})
if connectors:
output["connectors"] = connectors
# ── 4. Power Supplies (ACD-DESC-MIB) ──
power = _extract_table(walk_data, ".1.3.6.1.4.1.22420.1.1.11.1", {
"1": "id", "2": "name", "3": "type", "4": "present",
})
if power:
output["power_supplies"] = power
# ── 5. Temperature Sensors (ACD-DESC-MIB) ──
temps = _extract_table(walk_data, ".1.3.6.1.4.1.22420.1.1.12.1", {
"1": "id", "2": "currentTemp", "3": "highThreshold",
"4": "highAlarmEnabled", "5": "criticalThreshold",
"6": "criticalAlarmEnabled", "7": "label",
})
if temps:
output["temperature_sensors"] = temps
# ── 6. SFP Info (ACD-SFP-MIB) ──
sfp_info = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.4.1.1", {
"1": "id", "2": "connectorIdx", "3": "connectorType",
"4": "vendor", "5": "vendorOui", "6": "vendorPn",
"7": "vendorRev", "8": "wavelength", "9": "serialNum",
"10": "mfgYear", "11": "mfgMonth", "12": "mfgDay",
"13": "lot", "14": "rev8472", "15": "present",
"16": "diagCapable", "17": "internalCal", "18": "alarmCapable",
"19": "idType", "20": "extIdType", "21": "transCode",
})
if sfp_info:
# Decode hex-encoded SFP EEPROM fields to ASCII
hex_fields = ("vendor", "vendorPn", "vendorRev", "vendorOui", "transCode")
for idx, row in sfp_info.items():
for field in hex_fields:
if field in row:
row[field] = _decode_hex_string(row[field])
output["sfp_info"] = sfp_info
# ── 7. SFP Diagnostics (ACD-SFP-MIB) ──
sfp_diag = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.4.2.1", {
"1": "id", "2": "connectorIdx", "3": "temperature",
"4": "supplyVoltage", "5": "laserBiasCurrent",
"6": "txPower_uW", "7": "rxPower_uW",
"8": "txPower_dBm", "9": "rxPower_dBm",
})
if sfp_diag:
output["sfp_diagnostics"] = sfp_diag
# ── 8. SFP Thresholds (ACD-SFP-MIB) ──
sfp_thresh = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.4.3.1", {
"1": "id", "2": "connectorIdx",
"3": "tempHighAlarm", "4": "tempLowAlarm",
"5": "tempHighWarn", "6": "tempLowWarn",
"7": "vccHighAlarm", "8": "vccLowAlarm",
"9": "vccHighWarn", "10": "vccLowWarn",
"11": "lbcHighAlarm", "12": "lbcLowAlarm",
"13": "lbcHighWarn", "14": "lbcLowWarn",
"15": "txPwrHighAlarm", "16": "txPwrLowAlarm",
"17": "txPwrHighWarn", "18": "txPwrLowWarn",
"19": "rxPwrHighAlarm", "20": "rxPwrLowAlarm",
"21": "rxPwrHighWarn", "22": "rxPwrLowWarn",
"23": "txPwrHighAlarm_dBm", "24": "txPwrLowAlarm_dBm",
"25": "txPwrHighWarn_dBm", "26": "txPwrLowWarn_dBm",
"27": "rxPwrHighAlarm_dBm", "28": "rxPwrLowAlarm_dBm",
"29": "rxPwrHighWarn_dBm", "30": "rxPwrLowWarn_dBm",
})
if sfp_thresh:
output["sfp_thresholds"] = sfp_thresh
# ── 9. Alarms Config (ACD-ALARM-MIB) ──
alarm_cfg = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.1.10.1", {
"1": "id", "2": "number", "3": "description",
"4": "enabled", "5": "severity", "6": "serviceAffecting",
"7": "extNumber", "8": "conditionType", "9": "amoType",
"10": "on",
})
if alarm_cfg:
output["alarm_config"] = alarm_cfg
# ── 10. Alarm Status (ACD-ALARM-MIB) ──
alarm_status = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.1.11.1", {
"1": "id", "2": "number", "3": "active",
"4": "lastChange", "5": "message", "6": "on",
})
if alarm_status:
output["alarm_status"] = alarm_status
# ── 11. Alarm General Config ──
alarm_gen = OrderedDict()
alarm_gen_oids = {
".1.3.6.1.4.1.22420.2.1.1.0": "threshOnMs",
".1.3.6.1.4.1.22420.2.1.2.0": "threshOffMs",
".1.3.6.1.4.1.22420.2.1.3.0": "ledEnabled",
".1.3.6.1.4.1.22420.2.1.4.0": "syslogEnabled",
".1.3.6.1.4.1.22420.2.1.5.0": "snmpEnabled",
".1.3.6.1.4.1.22420.2.1.6.0": "802_3ahEnabled",
}
for oid, label in alarm_gen_oids.items():
if oid in walk_data:
alarm_gen[label] = walk_data[oid]
if alarm_gen:
output["alarm_general"] = alarm_gen
# ── 12. Port Config (ACD-PORT-MIB) ──
port_cfg = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.9.1.1.1.1", {
"1": "index", "2": "name", "3": "alias",
"4": "macAddress", "5": "connectorId", "6": "state",
"7": "mtu", "8": "autoNego", "9": "speed",
"10": "duplex", "11": "mdi", "12": "pauseMode",
"13": "advertisement", "14": "forceTxOn", "15": "laserMode",
})
if port_cfg:
output["port_config"] = port_cfg
# ── 13. Port Status (ACD-PORT-MIB) ──
port_status = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.9.1.2.1.1", {
"1": "index", "2": "name", "3": "connectorIdx",
"4": "linkStatus", "5": "speed", "6": "duplex",
"7": "mdi", "8": "pauseMode", "9": "sfpIdx",
})
if port_status:
output["port_status"] = port_status
# ── 14. L2 Filters (ACD-FILTER-MIB) ──
l2_filters = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.2.1.1", {
"1": "id", "2": "name",
"3": "macDstEn", "4": "macDst", "5": "macDstMask",
"6": "macSrcEn", "7": "macSrc", "8": "macSrcMask",
"9": "etypeEn", "10": "etype",
"11": "vlan1IdEn", "12": "vlan1Id",
"13": "vlan1PriorEn", "14": "vlan1Prior",
"15": "vlan1CfiEn", "16": "vlan1Cfi",
"17": "vlan2IdEn", "18": "vlan2Id",
"19": "vlan2PriorEn", "20": "vlan2Prior",
"21": "vlan2CfiEn", "22": "vlan2Cfi",
"23": "rowStatus",
})
if l2_filters:
output["l2_filters"] = l2_filters
# ── 15. Policy Lists (ACD-POLICY-MIB) ──
policy_lists = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.3.5.1.1.1", {
"2": "name", "3": "nbrEntries",
})
if policy_lists:
output["policy_lists"] = policy_lists
# ── 16. Policy→Port Bindings (ACD-POLICY-MIB) ──
policy_ports = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.3.5.2.1.1", {
"2": "policyListId",
})
if policy_ports:
output["policy_port_bindings"] = policy_ports
# ── 17. Policy Entries (ACD-POLICY-MIB) — enabled rules only ──
policy_entries_raw = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.3.1.1", {
"2": "listId", "3": "entryId", "4": "enable",
"5": "filterType", "6": "filterIndex",
"8": "monitorEnable", "9": "monitorIndex",
"10": "regulatorEnable", "11": "regulatorIndex",
"13": "action", "14": "evcMappingEncaps",
"16": "evcMappingVlanId",
"30": "outgoingPort",
})
# Keep only enabled entries to avoid 400-entry dump
policy_entries = OrderedDict()
if policy_entries_raw:
for idx, row in policy_entries_raw.items():
if row.get("enable") == "1":
policy_entries[idx] = row
if policy_entries:
output["policy_entries"] = policy_entries
# ── 18. Policy Traffic Stats (ACD-POLICY-MIB) ──
policy_stats = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.3.2.1", {
"2": "listId", "3": "entryId",
"4": "inPkts", "6": "inHCPkts",
"7": "inOctets", "9": "inHCOctets",
"10": "inPktsErr", "12": "inHCPktsErr",
})
if policy_stats:
output["policy_stats"] = policy_stats
# ── 19. Regulators (ACD-REGULATOR-MIB) ──
regulators = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.6.1.1", {
"2": "name", "3": "cirKbps", "4": "cbsKiB",
"5": "eirKbps", "6": "ebsKiB",
"7": "isBlind", "8": "isCouple", "9": "rowStatus",
"10": "workingRate", "11": "cirMaxKbps", "12": "eirMaxKbps",
})
if regulators:
output["regulators"] = regulators
# ── 20. Regulator Stats (ACD-REGULATOR-MIB) ──
reg_stats = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.6.2.1", {
"4": "acceptHCOctets", "7": "acceptHCPkts", "8": "acceptRateKbps",
"11": "dropHCOctets", "14": "dropHCPkts", "15": "dropRateKbps",
"16": "greenHCOctets", "17": "greenHCPkts",
"18": "yellowHCOctets", "19": "yellowHCPkts",
"20": "redHCOctets", "21": "redHCPkts",
"22": "greenRateKbps", "23": "yellowRateKbps", "24": "redRateKbps",
})
if reg_stats:
output["regulator_stats"] = reg_stats
# ── 21. CoS Profiles (ACD-SMAP-MIB) ──
cos_profiles = _extract_table(walk_data, ".1.3.6.1.4.1.22420.2.8.1.1.1.1", {
"2": "rowStatus", "3": "name", "4": "type",
"5": "decodeDropBit", "6": "encodeDropBit",
})
if cos_profiles:
output["cos_profiles"] = cos_profiles
# ── 22. IP Addresses (ipAddrTable + ipAddressTable) ──
ip_addresses = OrderedDict()
# ipAddrTable: .1.3.6.1.2.1.4.20.1 — columns: .1=addr, .2=ifIndex, .3=mask, .4=bcast
ip_addr_prefix = ".1.3.6.1.2.1.4.20.1."
ip_raw = {} # ip_string -> {field: value}
for oid, value in walk_data.items():
if not oid.startswith(ip_addr_prefix):
continue
rest = oid[len(ip_addr_prefix):]
parts = rest.split(".", 1)
if len(parts) != 2:
continue
col, ip_key = parts[0], parts[1]
if ip_key not in ip_raw:
ip_raw[ip_key] = {}
col_names = {"1": "address", "2": "ifIndex", "3": "netmask", "4": "bcastAddr"}
if col in col_names:
ip_raw[ip_key][col_names[col]] = value
# Also check ipAddressTable for origin/type: .1.3.6.1.2.1.4.34.1
ip_extra_prefix = ".1.3.6.1.2.1.4.34.1."
for oid, value in walk_data.items():
if not oid.startswith(ip_extra_prefix):
continue
rest = oid[len(ip_extra_prefix):]
# Format: col.addrType.addrLen.addr... — addrType 1=IPv4, 2=IPv6
parts = rest.split(".", 2)
if len(parts) < 3:
continue
col, addr_type = parts[0], parts[1]
if addr_type != "1": # Skip IPv6 for now
continue
addr_rest = parts[2]
# For IPv4: next part is "4" (length), then IP octets
addr_parts = addr_rest.split(".", 1)
if len(addr_parts) < 2 or addr_parts[0] != "4":
continue
ip_key = addr_parts[1]
col_names = {"3": "ifIndex2", "4": "type", "6": "origin", "7": "status"}
if col in col_names and ip_key in ip_raw:
ip_raw[ip_key][col_names[col]] = value
# Convert to indexed dict, skip loopback
idx = 1
for ip_key, fields in ip_raw.items():
addr = fields.get("address", ip_key)
if addr == "127.0.0.1":
continue
# Convert netmask to prefix length
mask = fields.get("netmask", "")
prefix_len = ""
if mask:
try:
bits = sum(bin(int(o)).count("1") for o in mask.split("."))
prefix_len = str(bits)
except (ValueError, AttributeError):
pass
ip_addresses[str(idx)] = OrderedDict([
("address", addr),
("prefixLength", prefix_len),
("netmask", mask),
("ifIndex", fields.get("ifIndex", "")),
("origin", fields.get("origin", "")),
("status", fields.get("status", "")),
])
idx += 1
if ip_addresses:
output["ip_addresses"] = ip_addresses
# ── 23. LLDP Neighbors (structured) ──
LLDP_PREFIX = ".1.3.111.2.802.1.1.13"
LLDP_REM_TABLE = LLDP_PREFIX + ".1.4.1.1." # lldpRemTable columns
LLDP_REM_MGMT = LLDP_PREFIX + ".1.4.2.1." # lldpRemManAddrTable
LLDP_STATS_TX = LLDP_PREFIX + ".1.2.6.1.3." # lldpStatsTxPortFramesTotal
LLDP_STATS_RX = LLDP_PREFIX + ".1.2.7.1.5." # lldpStatsRxPortFramesTotal
LLDP_STATS_NB = LLDP_PREFIX + ".1.2.7.1.8." # lldpStatsRxPortNeighbors
# Column names for lldpRemTable
rem_columns = {
"5": "chassisIdSubtype", "6": "chassisId",
"7": "portIdSubtype", "8": "remPortId",
"9": "remPortDesc", "10": "remSysName",
"11": "remSysDesc", "14": "capsSupported", "15": "capsEnabled",
}
# Parse lldpRemTable: OID = ...col.timeMark.localPort.remIdx.extra
rem_entries = {} # key = "localPort.remIdx"
for oid, value in walk_data.items():
if not oid.startswith(LLDP_REM_TABLE):
continue
rest = oid[len(LLDP_REM_TABLE):]
parts = rest.split(".", 4)
if len(parts) < 4:
continue
col, time_mark, local_port, rem_idx = parts[0], parts[1], parts[2], parts[3]
if col not in rem_columns:
continue
key = f"{local_port}.{rem_idx}"
if key not in rem_entries:
rem_entries[key] = {"localPort": local_port, "remIndex": rem_idx}
field = rem_columns[col]
# Hex-decode chassis ID: "9C E1 76 13 80 D9" → "9C:E1:76:13:80:D9"
if field == "chassisId" and " " in value:
value = value.strip().replace(" ", ":")
rem_entries[key][field] = value.strip()
# Parse management addresses from OID index encoding
# OID: ...3.timeMark.localPort.remIdx.extra.addrSubtype.addrLen.octets...
for oid, value in walk_data.items():
if not oid.startswith(LLDP_REM_MGMT + "3."):
continue
rest = oid[len(LLDP_REM_MGMT + "3."):]
parts = rest.split(".")
if len(parts) < 6:
continue
time_mark, local_port, rem_idx, extra = parts[0], parts[1], parts[2], parts[3]
addr_subtype = parts[4]
addr_len = int(parts[5]) if parts[5].isdigit() else 0
addr_octets = parts[6:6 + addr_len]
key = f"{local_port}.{rem_idx}"
if key not in rem_entries:
continue
if addr_subtype == "1" and addr_len == 4:
# IPv4
rem_entries[key]["mgmtIPv4"] = ".".join(addr_octets)
elif addr_subtype == "2" and addr_len == 16:
# IPv6 — group octets into hex pairs and collapse
hex_groups = []
for i in range(0, 16, 2):
high = int(addr_octets[i]) if i < len(addr_octets) else 0
low = int(addr_octets[i + 1]) if i + 1 < len(addr_octets) else 0
hex_groups.append(f"{high:02x}{low:02x}")
ipv6 = ":".join(hex_groups)
# Basic compression: collapse leading zeros per group
ipv6 = ":".join(g.lstrip("0") or "0" for g in ipv6.split(":"))
rem_entries[key]["mgmtIPv6"] = ipv6
# Cross-reference localPort with interfaces to get port name
interfaces_map = output.get("interfaces", {})
for entry in rem_entries.values():
lp = entry.get("localPort", "")
if lp in interfaces_map:
iface = interfaces_map[lp]
entry["localPortName"] = iface.get("ifName") or iface.get("ifDescr", "")
# Build lldp_neighbors as indexed dict
lldp_structured = OrderedDict()
for idx, (key, entry) in enumerate(sorted(rem_entries.items()), 1):
lldp_structured[str(idx)] = entry
if lldp_structured:
output["lldp_neighbors"] = lldp_structured
# Parse per-port LLDP statistics
lldp_stats = OrderedDict()
stat_prefixes = [
(LLDP_STATS_TX, "txFrames"),
(LLDP_STATS_RX, "rxFrames"),
(LLDP_STATS_NB, "neighborsLearned"),
]
for prefix, field_name in stat_prefixes:
for oid, value in walk_data.items():
if not oid.startswith(prefix):
continue
rest = oid[len(prefix):]
# Format: portNum.extra
port_parts = rest.split(".", 1)
port_num = port_parts[0]
if port_num not in lldp_stats:
lldp_stats[port_num] = {}
lldp_stats[port_num][field_name] = value
if lldp_stats:
output["lldp_stats"] = lldp_stats
# Keep raw LLDP dump for backward compatibility
lldp_raw = OrderedDict()
for oid, value in walk_data.items():
if oid.startswith(LLDP_PREFIX):
suffix = oid[len(LLDP_PREFIX):]
lldp_raw[f"lldp{suffix}"] = value
if lldp_raw:
output["lldp_raw"] = lldp_raw
# ── 24. Module OID counts (summary) ──
counts = defaultdict(int)
for oid in walk_data:
module = resolver.get_module(oid)
counts[module] += 1
output["_module_oid_counts"] = dict(sorted(counts.items(),
key=lambda x: x[1], reverse=True))
return output
def _decode_hex_string(val: str) -> str:
"""
Decode SNMP hex-encoded strings like '53 46 50 2D 4C 58 2D 53 4D 00 00...'
to ASCII 'SFP-LX-SM'. Returns original string if not hex-encoded.
"""
val = val.strip()
if not val:
return val
# Check if it looks like space-separated hex bytes
parts = val.split()
if len(parts) < 2:
return val
try:
byte_vals = [int(p, 16) for p in parts]
# Filter to printable ASCII, strip nulls
decoded = ''.join(chr(b) if 32 <= b < 127 else '' for b in byte_vals)
return decoded.strip() if decoded.strip() else val
except (ValueError, IndexError):
return val
def _extract_table(walk_data: dict, entry_prefix: str,
columns: dict) -> dict:
"""
Extract a table from walk data given the entry OID prefix and
column number -> name mapping.
entry_prefix: e.g., ".1.3.6.1.4.1.22420.2.4.1.1"
columns: {"1": "id", "2": "connectorIdx", ...}
Returns: {index: {col_name: value, ...}}
"""
table = defaultdict(OrderedDict)
prefix = entry_prefix + "."
for oid, value in walk_data.items():
if not oid.startswith(prefix):
continue
remainder = oid[len(prefix):]
parts = remainder.split(".", 1)
if len(parts) == 2:
col_num, idx = parts
elif len(parts) == 1:
col_num = parts[0]
idx = "0"
else:
continue
col_name = columns.get(col_num, f"col_{col_num}")
table[idx][col_name] = value
return dict(table) if table else {}
def build_tables_csv(monitoring: dict, output_file: Path):
"""Write reconstructed tables as a single CSV with table_name, index, column, value."""
table_sections = [
"interfaces", "connectors", "power_supplies", "temperature_sensors",
"sfp_info", "sfp_diagnostics", "sfp_thresholds",
"alarm_config", "alarm_status", "port_config", "port_status",
]
with output_file.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["table", "index", "column", "value"])
for section in table_sections:
if section not in monitoring:
continue
for idx, row in monitoring[section].items():
for col, val in row.items():
writer.writerow([section, idx, col, val])
# Also write scalars
if "device" in monitoring:
for col, val in monitoring["device"].items():
writer.writerow(["device", "0", col, val])
if "alarm_general" in monitoring:
for col, val in monitoring["alarm_general"].items():
writer.writerow(["alarm_general", "0", col, val])
# ────────────────────────────────────────────────────────────────────────
# CONSOLE REPORT
# ────────────────────────────────────────────────────────────────────────
def print_report(monitoring: dict):
"""Print a human-readable summary to the console."""
dev = monitoring.get("device", {})
print("\n" + "=" * 70)
print(" ACCEDIAN GT NID — SNMP WALK ANALYSIS")
print("=" * 70)
print(f"\n Model: {dev.get('commercialName', 'N/A')}")
print(f" Hostname: {dev.get('identifier', dev.get('sysName', 'N/A'))}")
print(f" Firmware: {dev.get('firmwareVersion', 'N/A')}")
print(f" Serial: {dev.get('serialNumber', 'N/A')}")
print(f" HW Version: {dev.get('hardwareVersion', 'N/A')}")
print(f" MAC Base: {dev.get('macBaseAddr', 'N/A')}")
print(f" Uptime: {dev.get('sysUpTime', 'N/A')}")
cpu = dev.get("cpuUsageCurrent")
if cpu:
print(f" CPU: {cpu}% (15s avg: {dev.get('cpuUsageAvg15s', '?')}%)")
# Interfaces
ifaces = monitoring.get("interfaces", {})
if ifaces:
print(f"\n INTERFACES ({len(ifaces)}):")
print(f" {'Idx':<6} {'Name':<22} {'Admin':<7} {'Oper':<7} {'Speed':<12} {'MAC'}")
print(f" {'---':<6} {'----':<22} {'-----':<7} {'----':<7} {'-----':<12} {'---'}")
for idx in sorted(ifaces.keys(), key=lambda x: int(x) if x.isdigit() else 9999):
row = ifaces[idx]
name = row.get("ifDescr", row.get("ifName", "?"))
admin = row.get("ifAdminStatus", "?")
oper = row.get("ifOperStatus", "?")
speed = row.get("ifHighSpeed", row.get("ifSpeed", "?"))
if speed and speed != "?" and speed != "0":
try:
s = int(speed)
speed = f"{s} Mbps" if s < 10000 else f"{s // 1000} Gbps"
except ValueError:
pass
mac = row.get("ifPhysAddress", "")
print(f" {idx:<6} {name:<22} {admin:<7} {oper:<7} {speed:<12} {mac}")
# Connectors
conns = monitoring.get("connectors", {})
if conns:
print(f"\n CONNECTORS ({len(conns)}):")
for idx in sorted(conns.keys(), key=lambda x: int(x) if x.isdigit() else 0):
c = conns[idx]
print(f" [{idx}] {c.get('name', '?')} — type={c.get('type', '?')}, PoE={c.get('poeSupport', '?')}")
# Power Supplies
pwr = monitoring.get("power_supplies", {})
if pwr:
print(f"\n POWER SUPPLIES ({len(pwr)}):")
for idx in sorted(pwr.keys(), key=lambda x: int(x) if x.isdigit() else 0):
p = pwr[idx]
ptype = {"1": "+5V DC", "2": "-48V DC"}.get(p.get("type", ""), p.get("type", "?"))
present = "YES" if p.get("present") in ("1", "true") else "NO"
print(f" [{idx}] {p.get('name', '?')}{ptype}, present={present}")
# Temperature Sensors
temps = monitoring.get("temperature_sensors", {})
if temps:
print(f"\n TEMPERATURE SENSORS ({len(temps)}):")
for idx in sorted(temps.keys(), key=lambda x: int(x) if x.isdigit() else 0):
t = temps[idx]
label = t.get("label", f"Sensor {idx}")
curr = t.get("currentTemp", "?")
high = t.get("highThreshold", "?")
crit = t.get("criticalThreshold", "?")
print(f" [{idx}] {label}: {curr}°C (warn={high}°C, crit={crit}°C)")
# SFP Info + Diagnostics
sfp_info = monitoring.get("sfp_info", {})
sfp_diag = monitoring.get("sfp_diagnostics", {})
if sfp_info:
print(f"\n SFP TRANSCEIVERS ({len(sfp_info)}):")
for idx in sorted(sfp_info.keys(), key=lambda x: int(x) if x.isdigit() else 0):
s = sfp_info[idx]
present = s.get("present", "?")
if present in ("2", "false"):
print(f" [SFP-{idx}] NOT PRESENT")
continue
vendor = _decode_hex_string(s.get("vendor", "?")).strip()
pn = _decode_hex_string(s.get("vendorPn", "?")).strip()
sn = s.get("serialNum", "?").strip()
wl = s.get("wavelength", "?")
wl_str = f"{wl}nm" if wl and wl != "0" and wl != "?" else ""
print(f" [SFP-{idx}] {vendor} {pn} {wl_str}")
print(f" S/N: {sn}")
# Diagnostics
diag = sfp_diag.get(idx, {})
if diag:
tx = diag.get("txPower_dBm", "?").strip().strip('"')
rx = diag.get("rxPower_dBm", "?").strip().strip('"')
temp = diag.get("temperature", "?")
vcc = diag.get("supplyVoltage", "?")
lbc = diag.get("laserBiasCurrent", "?")
# Format Vcc (raw value in 100uV units -> volts)
try:
vcc_v = int(vcc) / 10000.0
vcc = f"{vcc_v:.2f}V"
except (ValueError, TypeError):
pass
print(f" TX: {tx} | RX: {rx}")
print(f" Temp: {temp}°C | Vcc: {vcc} | LBC: {lbc} uA")
# Active Alarms Summary
alarm_status = monitoring.get("alarm_status", {})
alarm_cfg = monitoring.get("alarm_config", {})
if alarm_status:
active = [(idx, a) for idx, a in alarm_status.items()
if a.get("active") in ("1", "true")]
print(f"\n ALARMS: {len(active)} active / {len(alarm_status)} total")
if active:
for idx, a in active[:20]:
cfg = alarm_cfg.get(idx, {})
desc = cfg.get("description", a.get("message", "?"))
sev_map = {"0": "INFO", "1": "MINOR", "2": "MAJOR", "3": "CRIT"}
sev = sev_map.get(cfg.get("severity", ""), "?")
print(f" [{sev:>5}] {desc}")
elif alarm_cfg:
print(f"\n ALARMS: {len(alarm_cfg)} configured (no active status data)")
# Module OID counts
counts = monitoring.get("_module_oid_counts", {})
if counts:
print(f"\n OID DISTRIBUTION BY MODULE:")
total = sum(counts.values())
for module, count in counts.items():
pct = count / total * 100
bar = "#" * int(pct / 2)
print(f" {module:<28} {count:>6} ({pct:>5.1f}%) {bar}")
print("\n" + "=" * 70)
# ────────────────────────────────────────────────────────────────────────
# MAIN
# ────────────────────────────────────────────────────────────────────────
def main():
walk_file = Path(sys.argv[1]).expanduser() if len(sys.argv) > 1 else DEFAULT_WALK
if not walk_file.is_file():
print(f"Error: Walk file not found: {walk_file}", file=sys.stderr)
sys.exit(1)
base = walk_file.with_suffix("")
out_resolved = base.parent / (base.name + "_resolved.json")
out_monitoring = base.parent / (base.name + "_monitoring.json")
out_tables = base.parent / (base.name + "_tables.csv")
print(f"Input: {walk_file}")
print(f"MIBs: {MIB_DIR}")
print(f"Output: {out_resolved}")
print(f" {out_monitoring}")
print(f" {out_tables}")
# ── Load MIBs ──
print("\nParsing Accedian MIB files...")
accedian_map = parse_mib_files(MIB_DIR)
print(f" Resolved {len(accedian_map)} OID names from MIB files")
# ── Build resolver ──
resolver = OIDResolver(accedian_map, STANDARD_OID_MAP)
# ── Parse walk ──
print("\nParsing walk file...")
walk_data, skipped = parse_walk_file(walk_file)
print(f" Parsed {len(walk_data):,} OIDs (skipped {skipped:,} lines)")
# ── Resolve and count ──
resolved_count = 0
unresolved_count = 0
for oid in walk_data:
name, idx, info = resolver.resolve(oid)
if name:
resolved_count += 1
else:
unresolved_count += 1
print(f" Resolved: {resolved_count:,} ({resolved_count/len(walk_data)*100:.1f}%)")
print(f" Unresolved: {unresolved_count:,} ({unresolved_count/len(walk_data)*100:.1f}%)")
# ── Build outputs ──
print("\nBuilding structured outputs...")
resolved = build_resolved_output(walk_data, resolver)
with out_resolved.open("w", encoding="utf-8") as f:
json.dump(resolved, f, indent=2, ensure_ascii=False)
print(f" -> {out_resolved.name} ({out_resolved.stat().st_size:,} bytes)")
monitoring = build_monitoring_output(walk_data, resolver)
with out_monitoring.open("w", encoding="utf-8") as f:
json.dump(monitoring, f, indent=2, ensure_ascii=False)
print(f" -> {out_monitoring.name} ({out_monitoring.stat().st_size:,} bytes)")
build_tables_csv(monitoring, out_tables)
print(f" -> {out_tables.name} ({out_tables.stat().st_size:,} bytes)")
# ── Console report ──
print_report(monitoring)
print(f"\nDone. Files saved to {walk_file.parent}/")
if __name__ == "__main__":
main()