nid-snmp/nid-server.py
sam c285810c68 Two-phase focused neighbor walk and fix status/optics bugs
- Restructure neighbor walk into Phase 1 (discovery: ifDescr + ifName +
  ifStackTable) and Phase 2 (targeted snmpget for matched interfaces only).
  Reduces NCS 5500 walk from ~150k OIDs to ~20k discovery + ~600 targeted.
- Rename cisco-parse.py to cisco_parse.py for Python import compatibility.
- Add parse_walk_text() for in-process parsing without file I/O.
- Fix interface status showing DOWN/ADMIN DOWN: use isUp() instead of
  hardcoded === '1' checks, add -Oe flag to snmpget for numeric enums.
- Fix optics showing raw sensor values: apply entSensorPrecision scaling
  (e.g., -95122 with precision 4 → -9.5122 dBm).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:34 -07:00

800 lines
32 KiB
Python

#!/usr/bin/env python3
"""
SNMP NID Viewer — Web Server
Serves the NID viewer HTML and provides API endpoints to trigger
live SNMP walks from the browser UI.
Routes:
GET / — Viewer page (built from latest monitoring JSON)
POST /api/walk — Trigger a walk { "target": "x.x.x.x", "mode": "targeted"|"full" }
GET /api/status — SSE stream of walk progress
Usage:
python3 nid-server.py
"""
import json
import os
import re
import shutil
import subprocess
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
WALKS_DIR = SCRIPT_DIR / "walks"
# ── Phase 1: Discovery OIDs (lightweight, column-specific walks) ──────
# On an NCS 5500 with 10k+ interfaces, walking full ifTable/ifXTable would
# return ~150k OIDs. Instead, walk only the columns needed to identify the
# target interface and its children. ~2 OIDs per interface for discovery.
NEIGHBOR_DISCOVERY_OIDS = [
(".1.3.6.1.2.1.1", "System"), # ~8 OIDs
(".1.3.6.1.2.1.2.2.1.2", "ifDescr"), # 1 column: ifDescr
(".1.3.6.1.2.1.31.1.1.1.1", "ifName"), # 1 column: ifName
(".1.3.6.1.2.1.31.1.2", "ifStackTable"), # parent-child relationships
]
# ── Phase 2: Per-interface OID suffixes for targeted snmpget ─────────
# After matching the target ifIndex + children, we GET only these OIDs
# for each relevant interface. ~15 OIDs per interface instead of ~150k total.
NEIGHBOR_INTERFACE_OID_BASES = [
".1.3.6.1.2.1.2.2.1.3", # ifType
".1.3.6.1.2.1.2.2.1.4", # ifMtu
".1.3.6.1.2.1.2.2.1.5", # ifSpeed
".1.3.6.1.2.1.2.2.1.7", # ifAdminStatus
".1.3.6.1.2.1.2.2.1.8", # ifOperStatus
".1.3.6.1.2.1.2.2.1.10", # ifInOctets
".1.3.6.1.2.1.2.2.1.13", # ifInDiscards
".1.3.6.1.2.1.2.2.1.14", # ifInErrors
".1.3.6.1.2.1.2.2.1.16", # ifOutOctets
".1.3.6.1.2.1.2.2.1.19", # ifOutDiscards
".1.3.6.1.2.1.2.2.1.20", # ifOutErrors
".1.3.6.1.2.1.31.1.1.1.6", # ifHCInOctets
".1.3.6.1.2.1.31.1.1.1.10", # ifHCOutOctets
".1.3.6.1.2.1.31.1.1.1.15", # ifHighSpeed
".1.3.6.1.2.1.31.1.1.1.18", # ifAlias
]
# ── Extra subtrees to walk for IOS-XE VLAN/optics (Phase 2 optional) ─
NEIGHBOR_EXTRA_OIDS = [
(".1.3.6.1.2.1.47.1.1.1", "entPhysicalTable"),
(".1.3.6.1.4.1.9.9.91.1.1.1", "ciscoEntitySensor"),
]
# ── OID subtrees for targeted walk (mirrors snmp-walk.sh) ────────────
TARGETED_OIDS = [
(".1.3.6.1.2.1.1", "System"),
(".1.3.6.1.2.1.2", "IF-MIB"),
(".1.3.6.1.2.1.4", "IP-MIB"),
(".1.3.6.1.2.1.31", "IF-MIB-X"),
(".1.3.6.1.2.1.55", "IPv6-MIB"),
(".1.3.111.2.802.1.1.13", "LLDP-MIB"),
(".1.3.6.1.4.1.22420.1.1", "ACD-DESC-MIB"),
(".1.3.6.1.4.1.22420.2.1", "ACD-ALARM-MIB"),
(".1.3.6.1.4.1.22420.2.2", "ACD-FILTER-MIB"),
(".1.3.6.1.4.1.22420.2.4", "ACD-SFP-MIB"),
(".1.3.6.1.4.1.22420.2.6", "ACD-REGULATOR-MIB"),
(".1.3.6.1.4.1.22420.2.8", "ACD-SMAP-MIB"),
(".1.3.6.1.4.1.22420.2.9", "ACD-PORT-MIB"),
]
# ── Environment / config ─────────────────────────────────────────────
def load_env(path: Path) -> dict:
"""Parse a .env file into a dict (simple key=value, skip comments)."""
env = {}
if not path.is_file():
return env
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, val = line.partition("=")
env[key.strip()] = val.strip().strip('"').strip("'")
return env
ENV = load_env(SCRIPT_DIR / ".env")
SERVER_PORT = int(ENV.get("SERVER_PORT", "5525"))
SNMP_VERSION = ENV.get("SNMP_VERSION", "2c")
SNMP_COMMUNITY = ENV.get("SNMP_COMMUNITY", "public")
# SNMPv3 fields (future)
SNMP_V3_USER = ENV.get("SNMP_V3_USER", "")
SNMP_V3_AUTH_PROTO = ENV.get("SNMP_V3_AUTH_PROTO", "SHA")
SNMP_V3_AUTH_PASS = ENV.get("SNMP_V3_AUTH_PASS", "")
SNMP_V3_PRIV_PROTO = ENV.get("SNMP_V3_PRIV_PROTO", "AES")
SNMP_V3_PRIV_PASS = ENV.get("SNMP_V3_PRIV_PASS", "")
SNMP_V3_SEC_LEVEL = ENV.get("SNMP_V3_SEC_LEVEL", "authPriv")
# Conditionally include heavy policy MIB (~73% of all OIDs)
if ENV.get("SNMP_WALK_POLICIES", "true").lower() == "true":
TARGETED_OIDS.append((".1.3.6.1.4.1.22420.2.3", "ACD-POLICY-MIB"))
# Neighbor device SNMP credentials (falls back to NID creds)
NEIGHBOR_SNMP_VERSION = ENV.get("NEIGHBOR_SNMP_VERSION", SNMP_VERSION)
NEIGHBOR_SNMP_COMMUNITY = ENV.get("NEIGHBOR_SNMP_COMMUNITY", SNMP_COMMUNITY)
# ── Walk state (shared across threads) ───────────────────────────────
walk_lock = threading.Lock()
walk_status = {
"state": "idle", # idle | walking | parsing | building | complete | error
"message": "",
"progress": 0, # 0-100
"timestamp": None,
"lines": 0,
"elapsed": 0,
}
# Monotonically increasing version so SSE clients know when state changed
walk_version = 0
# Path to latest monitoring JSON (set after successful walk)
latest_json = None
# Neighbor walk state (independent from NID walk)
neighbor_lock = threading.Lock()
neighbor_status = {} # keyed by target IP: {"state": ..., "message": ..., "json_path": ...}
latest_neighbor = {} # keyed by target IP: path to latest neighbor monitoring JSON
def set_status(state, message="", progress=0, **extra):
global walk_version
with walk_lock:
walk_status["state"] = state
walk_status["message"] = message
walk_status["progress"] = progress
walk_status.update(extra)
walk_version += 1
def get_status():
with walk_lock:
return dict(walk_status), walk_version
# ── SNMP walk execution ──────────────────────────────────────────────
def build_snmp_auth() -> list:
"""Build snmpwalk authentication flags from env config."""
if SNMP_VERSION == "3":
args = ["-v3", "-u", SNMP_V3_USER, "-l", SNMP_V3_SEC_LEVEL]
if SNMP_V3_SEC_LEVEL != "noAuthNoPriv":
args += ["-a", SNMP_V3_AUTH_PROTO, "-A", SNMP_V3_AUTH_PASS]
if SNMP_V3_SEC_LEVEL == "authPriv":
args += ["-x", SNMP_V3_PRIV_PROTO, "-X", SNMP_V3_PRIV_PASS]
return args
return ["-v", SNMP_VERSION, "-c", SNMP_COMMUNITY]
def run_walk(target: str, mode: str, policies: bool = True):
"""Execute the full walk pipeline in a background thread."""
global latest_json
ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
if not ip_re.match(target):
set_status("error", message=f"Invalid IP address: {target}")
return
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
safe_ip = target.replace(".", "-")
walk_file = WALKS_DIR / f"{safe_ip}_{timestamp}_walk.txt"
WALKS_DIR.mkdir(parents=True, exist_ok=True)
auth = build_snmp_auth()
t_start = time.time()
# Build OID list for this walk — optionally exclude heavy policy MIB
walk_oids = list(TARGETED_OIDS)
if not policies:
walk_oids = [(oid, lbl) for oid, lbl in walk_oids if lbl != "ACD-POLICY-MIB"]
try:
# ── Step 1: snmpwalk ──────────────────────────────────────
# Use snmpbulkwalk (GETBULK PDUs) when available — much faster
walk_cmd = "snmpbulkwalk" if shutil.which("snmpbulkwalk") else "snmpwalk"
bulk_args = [] # use snmpbulkwalk default (-Cr10); higher values truncate on some devices
if mode == "full":
set_status("walking", message="Walking full OID tree (.1)", progress=5)
result = subprocess.run(
[walk_cmd, "-On", "-OQ"] + bulk_args + auth + [target, ".1"],
capture_output=True, text=True, timeout=300,
)
walk_file.write_text(result.stdout)
else:
# Walk subtrees in parallel for speed
total = len(walk_oids)
completed = [0] # mutable counter for progress
results_map = {}
def walk_subtree(idx, oid, label):
res = subprocess.run(
[walk_cmd, "-On", "-OQ"] + bulk_args + auth + [target, oid],
capture_output=True, text=True, timeout=120,
)
completed[0] += 1
pct = int((completed[0] / total) * 70)
set_status("walking", message=f"Walking subtrees ({completed[0]}/{total})", progress=pct)
return idx, res.stdout
set_status("walking", message=f"Walking {total} subtrees in parallel", progress=5)
with ThreadPoolExecutor(max_workers=4) as pool:
futures = [
pool.submit(walk_subtree, i, oid, label)
for i, (oid, label) in enumerate(walk_oids)
]
for fut in as_completed(futures):
idx, output = fut.result()
if output.strip():
results_map[idx] = output
# Reassemble in OID order
output_lines = [results_map[i] for i in sorted(results_map)]
walk_file.write_text("\n".join(output_lines))
line_count = sum(1 for _ in walk_file.open())
elapsed = round(time.time() - t_start, 1)
set_status("walking", message=f"Walk complete: {line_count:,} lines in {elapsed}s",
progress=72, lines=line_count)
if line_count == 0:
set_status("error", message=f"Walk returned no data — check reachability and credentials",
elapsed=elapsed)
return
# ── Step 2: snmp-parse.py ─────────────────────────────────
set_status("parsing", message="Parsing SNMP data", progress=78)
parse_result = subprocess.run(
[sys.executable, str(SCRIPT_DIR / "snmp-parse.py"), str(walk_file)],
capture_output=True, text=True, timeout=120,
)
if parse_result.returncode != 0:
set_status("error", message=f"Parse failed: {parse_result.stderr[:200]}")
return
monitoring_json = walk_file.with_name(walk_file.stem + "_monitoring.json")
if not monitoring_json.is_file():
set_status("error", message="Parser did not produce monitoring JSON")
return
latest_json = monitoring_json
elapsed = round(time.time() - t_start, 1)
set_status("complete",
message=f"Done — {line_count:,} lines in {elapsed}s",
progress=100, lines=line_count, elapsed=elapsed,
timestamp=datetime.now().isoformat())
except subprocess.TimeoutExpired:
set_status("error", message="Walk timed out — device may be unreachable")
except Exception as e:
set_status("error", message=str(e)[:300])
# ── Neighbor device walk ──────────────────────────────────────────────
def build_neighbor_snmp_auth() -> list:
"""Build snmpwalk auth flags for neighbor device (falls back to NID creds)."""
if NEIGHBOR_SNMP_VERSION == "3":
# Future: support v3 for neighbor
return build_snmp_auth()
return ["-v", NEIGHBOR_SNMP_VERSION, "-c", NEIGHBOR_SNMP_COMMUNITY]
def _walk_subtrees_parallel(walk_cmd, auth, target, oid_list, status_prefix=""):
"""Walk a list of (oid, label) subtrees in parallel. Returns combined text."""
total = len(oid_list)
completed = [0]
results_map = {}
def walk_one(idx, oid, label):
try:
res = subprocess.run(
[walk_cmd, "-On", "-OQ"] + auth + [target, oid],
capture_output=True, text=True, timeout=60,
)
completed[0] += 1
with neighbor_lock:
neighbor_status[target] = {
"state": "walking",
"message": f"{status_prefix}({completed[0]}/{total})",
}
return idx, res.stdout
except subprocess.TimeoutExpired:
completed[0] += 1
return idx, ""
with ThreadPoolExecutor(max_workers=4) as pool:
futures = [
pool.submit(walk_one, i, oid, label)
for i, (oid, label) in enumerate(oid_list)
]
for fut in as_completed(futures):
idx, output = fut.result()
if output.strip():
results_map[idx] = output
return "\n".join(results_map[i] for i in sorted(results_map))
def _snmpget_batch(walk_cmd, auth, target, oid_list):
"""Run snmpget for a batch of specific OIDs. Returns raw output text.
Uses snmpget (not bulkwalk) since we're requesting exact OIDs.
Falls back to individual gets if batch fails.
"""
if not oid_list:
return ""
# snmpget can handle multiple OIDs in one call (much faster than individual)
# Split into chunks of 30 to avoid command-line length limits
all_output = []
for i in range(0, len(oid_list), 30):
chunk = oid_list[i:i + 30]
try:
res = subprocess.run(
["snmpget", "-On", "-OQ", "-Oe"] + auth + [target] + chunk,
capture_output=True, text=True, timeout=30,
)
if res.stdout.strip():
all_output.append(res.stdout)
except subprocess.TimeoutExpired:
pass
return "\n".join(all_output)
def run_neighbor_walk(target: str, rem_port_id: str, rem_sys_name: str = ""):
"""Execute a two-phase focused SNMP walk against an LLDP neighbor device.
Phase 1 (Discovery): Walk System + ifDescr + ifName + ifStackTable
→ Identify target ifIndex and child subinterfaces
Phase 2 (Targeted): snmpget ~15 OIDs per matched interface only
→ Full interface facts for only the relevant interfaces
On an NCS 5500 with 10k interfaces, this reduces from ~150k OIDs
to ~20k discovery + ~600 targeted GETs.
"""
ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
if not ip_re.match(target):
with neighbor_lock:
neighbor_status[target] = {"state": "error", "message": f"Invalid IP: {target}"}
return
with neighbor_lock:
neighbor_status[target] = {"state": "walking", "message": "Phase 1: Discovering interfaces..."}
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
safe_ip = target.replace(".", "-")
walk_file = WALKS_DIR / f"{safe_ip}_{timestamp}_neighbor_walk.txt"
WALKS_DIR.mkdir(parents=True, exist_ok=True)
auth = build_neighbor_snmp_auth()
walk_cmd = "snmpbulkwalk" if shutil.which("snmpbulkwalk") else "snmpwalk"
t_start = time.time()
try:
# ── Phase 1: Discovery walk ──────────────────────────────────
discovery_output = _walk_subtrees_parallel(
walk_cmd, auth, target, NEIGHBOR_DISCOVERY_OIDS,
status_prefix="Phase 1: Discovery "
)
if not discovery_output.strip():
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": "Discovery walk returned no data — check credentials",
}
return
# Parse discovery data in-process to find target interface
sys.path.insert(0, str(SCRIPT_DIR))
from cisco_parse import parse_walk_text, build_interface_index, match_rem_port_id, \
discover_subinterfaces_stack, discover_subinterfaces_pattern
discovery_oids = parse_walk_text(discovery_output)
interfaces = build_interface_index(discovery_oids)
discovery_count = len(discovery_oids)
with neighbor_lock:
neighbor_status[target] = {
"state": "walking",
"message": f"Phase 1 done: {discovery_count:,} OIDs, {len(interfaces)} interfaces. Matching...",
}
matched_ifindex = match_rem_port_id(interfaces, rem_port_id)
if matched_ifindex is None:
# Write what we have and let cisco-parse produce a best-effort result
walk_file.write_text(discovery_output)
else:
# Find child interfaces (subinterfaces)
parent_descr = interfaces.get(matched_ifindex, {}).get("ifDescr", "")
child_indices = set()
# ifStackTable children
child_indices |= discover_subinterfaces_stack(discovery_oids, matched_ifindex)
# Pattern-based children (ifDescr matching)
if parent_descr:
pattern_children = discover_subinterfaces_pattern(discovery_oids, parent_descr)
child_indices |= set(pattern_children.keys())
# Also look for BDI/BVI interfaces that correlate with subinterfaces
bvi_indices = set()
for child_idx in child_indices:
child_descr = interfaces.get(child_idx, {}).get("ifDescr", "")
vlan_match = re.search(r"\.(\d+)$", child_descr)
if vlan_match:
vlan_id = vlan_match.group(1)
# Search ifDescr for BDI{N} or BVI{N}
for ifidx, info in interfaces.items():
d = info.get("ifDescr", "")
if d == f"BDI{vlan_id}" or d == f"BVI{vlan_id}":
bvi_indices.add(ifidx)
# Also find Vlan{N} SVIs (IOS-XE)
vlan_indices = set()
for ifidx, info in interfaces.items():
d = info.get("ifDescr", "")
if re.match(r"^Vlan\d+$", d):
vlan_indices.add(ifidx)
all_target_indices = {matched_ifindex} | child_indices | bvi_indices | vlan_indices
with neighbor_lock:
neighbor_status[target] = {
"state": "walking",
"message": f"Phase 2: Getting details for {len(all_target_indices)} interfaces...",
}
# ── Phase 2: Targeted snmpget ────────────────────────────
target_oids = []
for ifidx in all_target_indices:
for base_oid in NEIGHBOR_INTERFACE_OID_BASES:
target_oids.append(f"{base_oid}.{ifidx}")
phase2_output = _snmpget_batch(walk_cmd, auth, target, target_oids)
# Also walk optics/entity subtrees (small on most devices)
extra_output = _walk_subtrees_parallel(
walk_cmd, auth, target, NEIGHBOR_EXTRA_OIDS,
status_prefix="Phase 2: Optics/Entity "
)
# Combine all phases into one walk file
combined = discovery_output
if phase2_output.strip():
combined += "\n" + phase2_output
if extra_output.strip():
combined += "\n" + extra_output
walk_file.write_text(combined)
line_count = sum(1 for _ in walk_file.open())
elapsed = round(time.time() - t_start, 1)
if line_count == 0:
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": "Walk returned no data — check credentials",
}
return
# ── Parse combined data with cisco-parse.py ──────────────────
with neighbor_lock:
neighbor_status[target] = {"state": "parsing", "message": "Parsing neighbor data..."}
parse_result = subprocess.run(
[sys.executable, str(SCRIPT_DIR / "cisco_parse.py"),
str(walk_file), rem_port_id],
capture_output=True, text=True, timeout=60,
)
if parse_result.returncode != 0:
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": f"Parse failed: {parse_result.stderr[:200]}",
}
return
neighbor_json = walk_file.with_name(walk_file.stem + "_neighbor_monitoring.json")
if not neighbor_json.is_file():
with neighbor_lock:
neighbor_status[target] = {
"state": "error",
"message": "Parser did not produce neighbor JSON",
}
return
with neighbor_lock:
latest_neighbor[target] = neighbor_json
neighbor_status[target] = {
"state": "complete",
"message": f"Done — {line_count:,} OIDs ({discovery_count:,} discovery + {line_count - discovery_count:,} targeted) in {elapsed}s",
"json_path": str(neighbor_json),
}
except Exception as e:
with neighbor_lock:
neighbor_status[target] = {"state": "error", "message": str(e)[:300]}
# ── Find latest monitoring JSON ──────────────────────────────────────
def find_latest_json() -> Path | None:
"""Return the most recent *_monitoring.json in walks/."""
if latest_json and latest_json.is_file():
return latest_json
candidates = sorted(WALKS_DIR.glob("*_monitoring.json"), key=lambda p: p.stat().st_mtime)
return candidates[-1] if candidates else None
# ── HTTP handler ─────────────────────────────────────────────────────
class NIDHandler(BaseHTTPRequestHandler):
"""Serve viewer and handle walk API."""
def log_message(self, format, *args):
# Cleaner logging
print(f"[{self.log_date_time_string()}] {format % args}")
def _send(self, code, content_type, body):
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(body)
def _send_json(self, code, obj):
self._send(code, "application/json", json.dumps(obj).encode())
# ── GET ────────────────────────────────────────────────────────
def do_GET(self):
if self.path == "/":
self._serve_viewer()
elif self.path == "/api/status":
self._serve_sse()
elif self.path.startswith("/api/neighbor-data"):
self._handle_neighbor_data()
elif self.path.startswith("/api/neighbor-status"):
self._handle_neighbor_status()
else:
self._send(404, "text/plain", b"Not found")
def _serve_viewer(self):
"""Build and serve the viewer HTML from latest data."""
# Import build_html lazily to avoid circular issues at module level
sys.path.insert(0, str(SCRIPT_DIR))
from build_nid_viewer import build_html
json_path = find_latest_json()
if json_path:
with json_path.open() as f:
data = json.load(f)
else:
data = {}
# Merge any available neighbor data into the viewer data
with neighbor_lock:
if latest_neighbor:
nd = {}
for ip, npath in latest_neighbor.items():
if npath and npath.is_file():
try:
with npath.open() as f:
nd[ip] = json.load(f)
except Exception:
pass
if nd:
data["neighbor_data"] = nd
html = build_html(data)
self._send(200, "text/html; charset=utf-8", html.encode())
def _serve_sse(self):
"""Stream walk status as Server-Sent Events."""
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-store")
self.send_header("Connection", "keep-alive")
self.end_headers()
last_version = -1
try:
while True:
status, version = get_status()
if version != last_version:
msg = f"data: {json.dumps(status)}\n\n"
self.wfile.write(msg.encode())
self.wfile.flush()
last_version = version
# Close SSE after terminal states so client triggers reload
if status["state"] in ("complete", "error"):
break
time.sleep(0.3)
except (BrokenPipeError, ConnectionResetError):
pass
# ── POST ───────────────────────────────────────────────────────
def do_POST(self):
if self.path == "/api/walk":
self._handle_walk()
elif self.path == "/api/ping":
self._handle_ping()
elif self.path == "/api/clear":
self._handle_clear()
elif self.path == "/api/neighbor-walk":
self._handle_neighbor_walk()
else:
self._send(404, "text/plain", b"Not found")
def _handle_ping(self):
"""Ping a target IP to check reachability."""
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
target = body.get("target", "").strip()
if not target:
self._send_json(400, {"error": "target IP required"})
return
ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
if not ip_re.match(target):
self._send_json(400, {"error": f"Invalid IP address: {target}"})
return
try:
result = subprocess.run(
["ping", "-c", "1", "-W", "2", target],
capture_output=True, text=True, timeout=5,
)
reachable = result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
reachable = False
self._send_json(200, {"reachable": reachable, "target": target})
def _handle_walk(self):
"""Start a walk in a background thread."""
current, _ = get_status()
if current["state"] in ("walking", "parsing"):
self._send_json(409, {"error": "Walk already in progress"})
return
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
target = body.get("target", "").strip()
mode = body.get("mode", "targeted").strip()
policies = body.get("policies", True)
if not target:
self._send_json(400, {"error": "target IP required"})
return
if mode not in ("targeted", "full"):
self._send_json(400, {"error": "mode must be 'targeted' or 'full'"})
return
set_status("walking", message="Starting walk...", progress=1)
thread = threading.Thread(target=run_walk, args=(target, mode, policies), daemon=True)
thread.start()
self._send_json(200, {"status": "started", "target": target, "mode": mode})
def _handle_neighbor_walk(self):
"""Start a neighbor device walk in a background thread."""
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {}
target = body.get("target", "").strip()
rem_port_id = body.get("remPortId", "").strip()
rem_sys_name = body.get("remSysName", "").strip()
if not target or not rem_port_id:
self._send_json(400, {"error": "target and remPortId required"})
return
ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")
if not ip_re.match(target):
self._send_json(400, {"error": f"Invalid IP address: {target}"})
return
# Check if already walking this neighbor
with neighbor_lock:
ns = neighbor_status.get(target, {})
if ns.get("state") == "walking":
self._send_json(409, {"error": f"Already walking {target}"})
return
thread = threading.Thread(
target=run_neighbor_walk,
args=(target, rem_port_id, rem_sys_name),
daemon=True,
)
thread.start()
self._send_json(200, {"status": "started", "target": target})
def _handle_neighbor_data(self):
"""Return the latest neighbor monitoring JSON for a given target IP."""
# Parse ?target=x.x.x.x from query string
from urllib.parse import urlparse, parse_qs
qs = parse_qs(urlparse(self.path).query)
target = qs.get("target", [None])[0]
if not target:
self._send_json(400, {"error": "target query param required"})
return
with neighbor_lock:
json_path = latest_neighbor.get(target)
if not json_path or not json_path.is_file():
self._send_json(404, {"error": f"No neighbor data for {target}"})
return
with json_path.open() as f:
data = json.load(f)
self._send_json(200, data)
def _handle_neighbor_status(self):
"""Return the current walk status for a neighbor target."""
from urllib.parse import urlparse, parse_qs
qs = parse_qs(urlparse(self.path).query)
target = qs.get("target", [None])[0]
if not target:
self._send_json(400, {"error": "target query param required"})
return
with neighbor_lock:
status = neighbor_status.get(target, {"state": "idle", "message": ""})
self._send_json(200, status)
def _handle_clear(self):
"""Move all walk data to walks/archive/ and reset state."""
global latest_json
archive = WALKS_DIR / "archive"
archive.mkdir(parents=True, exist_ok=True)
moved = 0
for f in WALKS_DIR.iterdir():
if f.is_file():
f.rename(archive / f.name)
moved += 1
latest_json = None
set_status("idle")
self._send_json(200, {"status": "cleared", "files_archived": moved})
# ── Main ─────────────────────────────────────────────────────────────
def main():
server = ThreadingHTTPServer(("0.0.0.0", SERVER_PORT), NIDHandler)
print(f"╔═══════════════════════════════════════════════════╗")
print(f"║ SNMP NID Viewer — Web Server ║")
print(f"║ http://localhost:{SERVER_PORT:<5}")
print(f"╚═══════════════════════════════════════════════════╝")
print(f" SNMPv{SNMP_VERSION} | Press Ctrl+C to stop")
print()
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down.")
server.shutdown()
if __name__ == "__main__":
main()