#!/usr/bin/env python3 """ SNMP NID Viewer — Web Server Serves the NID viewer HTML and provides API endpoints to trigger live SNMP walks from the browser UI. Routes: GET / — Viewer page (built from latest monitoring JSON) POST /api/walk — Trigger a walk { "target": "x.x.x.x", "mode": "targeted"|"full" } GET /api/status — SSE stream of walk progress Usage: python3 nid-server.py """ import json import os import re import shutil import subprocess import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from pathlib import Path SCRIPT_DIR = Path(__file__).resolve().parent WALKS_DIR = SCRIPT_DIR / "walks" # ── Phase 1: Discovery OIDs (lightweight, column-specific walks) ────── # On an NCS 5500 with 10k+ interfaces, walking full ifTable/ifXTable would # return ~150k OIDs. Instead, walk only the columns needed to identify the # target interface and its children. ~2 OIDs per interface for discovery. NEIGHBOR_DISCOVERY_OIDS = [ (".1.3.6.1.2.1.1", "System"), # ~8 OIDs (".1.3.6.1.2.1.2.2.1.2", "ifDescr"), # 1 column: ifDescr (".1.3.6.1.2.1.31.1.1.1.1", "ifName"), # 1 column: ifName (".1.3.6.1.2.1.31.1.2", "ifStackTable"), # parent-child relationships ] # ── Phase 2: Per-interface OID suffixes for targeted snmpget ───────── # After matching the target ifIndex + children, we GET only these OIDs # for each relevant interface. ~15 OIDs per interface instead of ~150k total. NEIGHBOR_INTERFACE_OID_BASES = [ ".1.3.6.1.2.1.2.2.1.3", # ifType ".1.3.6.1.2.1.2.2.1.4", # ifMtu ".1.3.6.1.2.1.2.2.1.5", # ifSpeed ".1.3.6.1.2.1.2.2.1.7", # ifAdminStatus ".1.3.6.1.2.1.2.2.1.8", # ifOperStatus ".1.3.6.1.2.1.2.2.1.10", # ifInOctets ".1.3.6.1.2.1.2.2.1.13", # ifInDiscards ".1.3.6.1.2.1.2.2.1.14", # ifInErrors ".1.3.6.1.2.1.2.2.1.16", # ifOutOctets ".1.3.6.1.2.1.2.2.1.19", # ifOutDiscards ".1.3.6.1.2.1.2.2.1.20", # ifOutErrors ".1.3.6.1.2.1.31.1.1.1.6", # ifHCInOctets ".1.3.6.1.2.1.31.1.1.1.10", # ifHCOutOctets ".1.3.6.1.2.1.31.1.1.1.15", # ifHighSpeed ".1.3.6.1.2.1.31.1.1.1.18", # ifAlias ] # ── Extra subtrees to walk for IOS-XE VLAN/optics (Phase 2 optional) ─ NEIGHBOR_EXTRA_OIDS = [ (".1.3.6.1.2.1.47.1.1.1", "entPhysicalTable"), (".1.3.6.1.4.1.9.9.91.1.1.1", "ciscoEntitySensor"), ] # ── OID subtrees for targeted walk (mirrors snmp-walk.sh) ──────────── TARGETED_OIDS = [ (".1.3.6.1.2.1.1", "System"), (".1.3.6.1.2.1.2", "IF-MIB"), (".1.3.6.1.2.1.4", "IP-MIB"), (".1.3.6.1.2.1.31", "IF-MIB-X"), (".1.3.6.1.2.1.55", "IPv6-MIB"), (".1.3.111.2.802.1.1.13", "LLDP-MIB"), (".1.3.6.1.4.1.22420.1.1", "ACD-DESC-MIB"), (".1.3.6.1.4.1.22420.2.1", "ACD-ALARM-MIB"), (".1.3.6.1.4.1.22420.2.2", "ACD-FILTER-MIB"), (".1.3.6.1.4.1.22420.2.4", "ACD-SFP-MIB"), (".1.3.6.1.4.1.22420.2.6", "ACD-REGULATOR-MIB"), (".1.3.6.1.4.1.22420.2.8", "ACD-SMAP-MIB"), (".1.3.6.1.4.1.22420.2.9", "ACD-PORT-MIB"), ] # ── Environment / config ───────────────────────────────────────────── def load_env(path: Path) -> dict: """Parse a .env file into a dict (simple key=value, skip comments).""" env = {} if not path.is_file(): return env for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, _, val = line.partition("=") env[key.strip()] = val.strip().strip('"').strip("'") return env ENV = load_env(SCRIPT_DIR / ".env") SERVER_PORT = int(ENV.get("SERVER_PORT", "5525")) SNMP_VERSION = ENV.get("SNMP_VERSION", "2c") SNMP_COMMUNITY = ENV.get("SNMP_COMMUNITY", "public") # SNMPv3 fields (future) SNMP_V3_USER = ENV.get("SNMP_V3_USER", "") SNMP_V3_AUTH_PROTO = ENV.get("SNMP_V3_AUTH_PROTO", "SHA") SNMP_V3_AUTH_PASS = ENV.get("SNMP_V3_AUTH_PASS", "") SNMP_V3_PRIV_PROTO = ENV.get("SNMP_V3_PRIV_PROTO", "AES") SNMP_V3_PRIV_PASS = ENV.get("SNMP_V3_PRIV_PASS", "") SNMP_V3_SEC_LEVEL = ENV.get("SNMP_V3_SEC_LEVEL", "authPriv") # Conditionally include heavy policy MIB (~73% of all OIDs) if ENV.get("SNMP_WALK_POLICIES", "true").lower() == "true": TARGETED_OIDS.append((".1.3.6.1.4.1.22420.2.3", "ACD-POLICY-MIB")) # Neighbor device SNMP credentials (falls back to NID creds) NEIGHBOR_SNMP_VERSION = ENV.get("NEIGHBOR_SNMP_VERSION", SNMP_VERSION) NEIGHBOR_SNMP_COMMUNITY = ENV.get("NEIGHBOR_SNMP_COMMUNITY", SNMP_COMMUNITY) # ── Walk state (shared across threads) ─────────────────────────────── walk_lock = threading.Lock() walk_status = { "state": "idle", # idle | walking | parsing | building | complete | error "message": "", "progress": 0, # 0-100 "timestamp": None, "lines": 0, "elapsed": 0, } # Monotonically increasing version so SSE clients know when state changed walk_version = 0 # Path to latest monitoring JSON (set after successful walk) latest_json = None # Neighbor walk state (independent from NID walk) neighbor_lock = threading.Lock() neighbor_status = {} # keyed by target IP: {"state": ..., "message": ..., "json_path": ...} latest_neighbor = {} # keyed by target IP: path to latest neighbor monitoring JSON def set_status(state, message="", progress=0, **extra): global walk_version with walk_lock: walk_status["state"] = state walk_status["message"] = message walk_status["progress"] = progress walk_status.update(extra) walk_version += 1 def get_status(): with walk_lock: return dict(walk_status), walk_version # ── SNMP walk execution ────────────────────────────────────────────── def build_snmp_auth() -> list: """Build snmpwalk authentication flags from env config.""" if SNMP_VERSION == "3": args = ["-v3", "-u", SNMP_V3_USER, "-l", SNMP_V3_SEC_LEVEL] if SNMP_V3_SEC_LEVEL != "noAuthNoPriv": args += ["-a", SNMP_V3_AUTH_PROTO, "-A", SNMP_V3_AUTH_PASS] if SNMP_V3_SEC_LEVEL == "authPriv": args += ["-x", SNMP_V3_PRIV_PROTO, "-X", SNMP_V3_PRIV_PASS] return args return ["-v", SNMP_VERSION, "-c", SNMP_COMMUNITY] def run_walk(target: str, mode: str, policies: bool = True): """Execute the full walk pipeline in a background thread.""" global latest_json ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$") if not ip_re.match(target): set_status("error", message=f"Invalid IP address: {target}") return timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") safe_ip = target.replace(".", "-") walk_file = WALKS_DIR / f"{safe_ip}_{timestamp}_walk.txt" WALKS_DIR.mkdir(parents=True, exist_ok=True) auth = build_snmp_auth() t_start = time.time() # Build OID list for this walk — optionally exclude heavy policy MIB walk_oids = list(TARGETED_OIDS) if not policies: walk_oids = [(oid, lbl) for oid, lbl in walk_oids if lbl != "ACD-POLICY-MIB"] try: # ── Step 1: snmpwalk ────────────────────────────────────── # Use snmpbulkwalk (GETBULK PDUs) when available — much faster walk_cmd = "snmpbulkwalk" if shutil.which("snmpbulkwalk") else "snmpwalk" bulk_args = [] # use snmpbulkwalk default (-Cr10); higher values truncate on some devices if mode == "full": set_status("walking", message="Walking full OID tree (.1)", progress=5) result = subprocess.run( [walk_cmd, "-On", "-OQ"] + bulk_args + auth + [target, ".1"], capture_output=True, text=True, timeout=300, ) walk_file.write_text(result.stdout) else: # Walk subtrees in parallel for speed total = len(walk_oids) completed = [0] # mutable counter for progress results_map = {} def walk_subtree(idx, oid, label): res = subprocess.run( [walk_cmd, "-On", "-OQ"] + bulk_args + auth + [target, oid], capture_output=True, text=True, timeout=120, ) completed[0] += 1 pct = int((completed[0] / total) * 70) set_status("walking", message=f"Walking subtrees ({completed[0]}/{total})", progress=pct) return idx, res.stdout set_status("walking", message=f"Walking {total} subtrees in parallel", progress=5) with ThreadPoolExecutor(max_workers=4) as pool: futures = [ pool.submit(walk_subtree, i, oid, label) for i, (oid, label) in enumerate(walk_oids) ] for fut in as_completed(futures): idx, output = fut.result() if output.strip(): results_map[idx] = output # Reassemble in OID order output_lines = [results_map[i] for i in sorted(results_map)] walk_file.write_text("\n".join(output_lines)) line_count = sum(1 for _ in walk_file.open()) elapsed = round(time.time() - t_start, 1) set_status("walking", message=f"Walk complete: {line_count:,} lines in {elapsed}s", progress=72, lines=line_count) if line_count == 0: set_status("error", message=f"Walk returned no data — check reachability and credentials", elapsed=elapsed) return # ── Step 2: snmp-parse.py ───────────────────────────────── set_status("parsing", message="Parsing SNMP data", progress=78) parse_result = subprocess.run( [sys.executable, str(SCRIPT_DIR / "snmp-parse.py"), str(walk_file)], capture_output=True, text=True, timeout=120, ) if parse_result.returncode != 0: set_status("error", message=f"Parse failed: {parse_result.stderr[:200]}") return monitoring_json = walk_file.with_name(walk_file.stem + "_monitoring.json") if not monitoring_json.is_file(): set_status("error", message="Parser did not produce monitoring JSON") return latest_json = monitoring_json elapsed = round(time.time() - t_start, 1) set_status("complete", message=f"Done — {line_count:,} lines in {elapsed}s", progress=100, lines=line_count, elapsed=elapsed, timestamp=datetime.now().isoformat()) except subprocess.TimeoutExpired: set_status("error", message="Walk timed out — device may be unreachable") except Exception as e: set_status("error", message=str(e)[:300]) # ── Neighbor device walk ────────────────────────────────────────────── def build_neighbor_snmp_auth() -> list: """Build snmpwalk auth flags for neighbor device (falls back to NID creds).""" if NEIGHBOR_SNMP_VERSION == "3": # Future: support v3 for neighbor return build_snmp_auth() return ["-v", NEIGHBOR_SNMP_VERSION, "-c", NEIGHBOR_SNMP_COMMUNITY] def _walk_subtrees_parallel(walk_cmd, auth, target, oid_list, status_prefix=""): """Walk a list of (oid, label) subtrees in parallel. Returns combined text.""" total = len(oid_list) completed = [0] results_map = {} def walk_one(idx, oid, label): try: res = subprocess.run( [walk_cmd, "-On", "-OQ"] + auth + [target, oid], capture_output=True, text=True, timeout=60, ) completed[0] += 1 with neighbor_lock: neighbor_status[target] = { "state": "walking", "message": f"{status_prefix}({completed[0]}/{total})", } return idx, res.stdout except subprocess.TimeoutExpired: completed[0] += 1 return idx, "" with ThreadPoolExecutor(max_workers=4) as pool: futures = [ pool.submit(walk_one, i, oid, label) for i, (oid, label) in enumerate(oid_list) ] for fut in as_completed(futures): idx, output = fut.result() if output.strip(): results_map[idx] = output return "\n".join(results_map[i] for i in sorted(results_map)) def _snmpget_batch(walk_cmd, auth, target, oid_list): """Run snmpget for a batch of specific OIDs. Returns raw output text. Uses snmpget (not bulkwalk) since we're requesting exact OIDs. Falls back to individual gets if batch fails. """ if not oid_list: return "" # snmpget can handle multiple OIDs in one call (much faster than individual) # Split into chunks of 30 to avoid command-line length limits all_output = [] for i in range(0, len(oid_list), 30): chunk = oid_list[i:i + 30] try: res = subprocess.run( ["snmpget", "-On", "-OQ", "-Oe"] + auth + [target] + chunk, capture_output=True, text=True, timeout=30, ) if res.stdout.strip(): all_output.append(res.stdout) except subprocess.TimeoutExpired: pass return "\n".join(all_output) def run_neighbor_walk(target: str, rem_port_id: str, rem_sys_name: str = ""): """Execute a two-phase focused SNMP walk against an LLDP neighbor device. Phase 1 (Discovery): Walk System + ifDescr + ifName + ifStackTable → Identify target ifIndex and child subinterfaces Phase 2 (Targeted): snmpget ~15 OIDs per matched interface only → Full interface facts for only the relevant interfaces On an NCS 5500 with 10k interfaces, this reduces from ~150k OIDs to ~20k discovery + ~600 targeted GETs. """ ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$") if not ip_re.match(target): with neighbor_lock: neighbor_status[target] = {"state": "error", "message": f"Invalid IP: {target}"} return with neighbor_lock: neighbor_status[target] = {"state": "walking", "message": "Phase 1: Discovering interfaces..."} timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") safe_ip = target.replace(".", "-") walk_file = WALKS_DIR / f"{safe_ip}_{timestamp}_neighbor_walk.txt" WALKS_DIR.mkdir(parents=True, exist_ok=True) auth = build_neighbor_snmp_auth() walk_cmd = "snmpbulkwalk" if shutil.which("snmpbulkwalk") else "snmpwalk" t_start = time.time() try: # ── Phase 1: Discovery walk ────────────────────────────────── discovery_output = _walk_subtrees_parallel( walk_cmd, auth, target, NEIGHBOR_DISCOVERY_OIDS, status_prefix="Phase 1: Discovery " ) if not discovery_output.strip(): with neighbor_lock: neighbor_status[target] = { "state": "error", "message": "Discovery walk returned no data — check credentials", } return # Parse discovery data in-process to find target interface sys.path.insert(0, str(SCRIPT_DIR)) from cisco_parse import parse_walk_text, build_interface_index, match_rem_port_id, \ discover_subinterfaces_stack, discover_subinterfaces_pattern discovery_oids = parse_walk_text(discovery_output) interfaces = build_interface_index(discovery_oids) discovery_count = len(discovery_oids) with neighbor_lock: neighbor_status[target] = { "state": "walking", "message": f"Phase 1 done: {discovery_count:,} OIDs, {len(interfaces)} interfaces. Matching...", } matched_ifindex = match_rem_port_id(interfaces, rem_port_id) if matched_ifindex is None: # Write what we have and let cisco-parse produce a best-effort result walk_file.write_text(discovery_output) else: # Find child interfaces (subinterfaces) parent_descr = interfaces.get(matched_ifindex, {}).get("ifDescr", "") child_indices = set() # ifStackTable children child_indices |= discover_subinterfaces_stack(discovery_oids, matched_ifindex) # Pattern-based children (ifDescr matching) if parent_descr: pattern_children = discover_subinterfaces_pattern(discovery_oids, parent_descr) child_indices |= set(pattern_children.keys()) # Also look for BDI/BVI interfaces that correlate with subinterfaces bvi_indices = set() for child_idx in child_indices: child_descr = interfaces.get(child_idx, {}).get("ifDescr", "") vlan_match = re.search(r"\.(\d+)$", child_descr) if vlan_match: vlan_id = vlan_match.group(1) # Search ifDescr for BDI{N} or BVI{N} for ifidx, info in interfaces.items(): d = info.get("ifDescr", "") if d == f"BDI{vlan_id}" or d == f"BVI{vlan_id}": bvi_indices.add(ifidx) # Also find Vlan{N} SVIs (IOS-XE) vlan_indices = set() for ifidx, info in interfaces.items(): d = info.get("ifDescr", "") if re.match(r"^Vlan\d+$", d): vlan_indices.add(ifidx) all_target_indices = {matched_ifindex} | child_indices | bvi_indices | vlan_indices with neighbor_lock: neighbor_status[target] = { "state": "walking", "message": f"Phase 2: Getting details for {len(all_target_indices)} interfaces...", } # ── Phase 2: Targeted snmpget ──────────────────────────── target_oids = [] for ifidx in all_target_indices: for base_oid in NEIGHBOR_INTERFACE_OID_BASES: target_oids.append(f"{base_oid}.{ifidx}") phase2_output = _snmpget_batch(walk_cmd, auth, target, target_oids) # Also walk optics/entity subtrees (small on most devices) extra_output = _walk_subtrees_parallel( walk_cmd, auth, target, NEIGHBOR_EXTRA_OIDS, status_prefix="Phase 2: Optics/Entity " ) # Combine all phases into one walk file combined = discovery_output if phase2_output.strip(): combined += "\n" + phase2_output if extra_output.strip(): combined += "\n" + extra_output walk_file.write_text(combined) line_count = sum(1 for _ in walk_file.open()) elapsed = round(time.time() - t_start, 1) if line_count == 0: with neighbor_lock: neighbor_status[target] = { "state": "error", "message": "Walk returned no data — check credentials", } return # ── Parse combined data with cisco-parse.py ────────────────── with neighbor_lock: neighbor_status[target] = {"state": "parsing", "message": "Parsing neighbor data..."} parse_result = subprocess.run( [sys.executable, str(SCRIPT_DIR / "cisco_parse.py"), str(walk_file), rem_port_id], capture_output=True, text=True, timeout=60, ) if parse_result.returncode != 0: with neighbor_lock: neighbor_status[target] = { "state": "error", "message": f"Parse failed: {parse_result.stderr[:200]}", } return neighbor_json = walk_file.with_name(walk_file.stem + "_neighbor_monitoring.json") if not neighbor_json.is_file(): with neighbor_lock: neighbor_status[target] = { "state": "error", "message": "Parser did not produce neighbor JSON", } return with neighbor_lock: latest_neighbor[target] = neighbor_json neighbor_status[target] = { "state": "complete", "message": f"Done — {line_count:,} OIDs ({discovery_count:,} discovery + {line_count - discovery_count:,} targeted) in {elapsed}s", "json_path": str(neighbor_json), } except Exception as e: with neighbor_lock: neighbor_status[target] = {"state": "error", "message": str(e)[:300]} # ── Find latest monitoring JSON ────────────────────────────────────── def find_latest_json() -> Path | None: """Return the most recent *_monitoring.json in walks/.""" if latest_json and latest_json.is_file(): return latest_json candidates = sorted(WALKS_DIR.glob("*_monitoring.json"), key=lambda p: p.stat().st_mtime) return candidates[-1] if candidates else None # ── HTTP handler ───────────────────────────────────────────────────── class NIDHandler(BaseHTTPRequestHandler): """Serve viewer and handle walk API.""" def log_message(self, format, *args): # Cleaner logging print(f"[{self.log_date_time_string()}] {format % args}") def _send(self, code, content_type, body): self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(body) def _send_json(self, code, obj): self._send(code, "application/json", json.dumps(obj).encode()) # ── GET ──────────────────────────────────────────────────────── def do_GET(self): if self.path == "/": self._serve_viewer() elif self.path == "/api/status": self._serve_sse() elif self.path.startswith("/api/neighbor-data"): self._handle_neighbor_data() elif self.path.startswith("/api/neighbor-status"): self._handle_neighbor_status() else: self._send(404, "text/plain", b"Not found") def _serve_viewer(self): """Build and serve the viewer HTML from latest data.""" # Import build_html lazily to avoid circular issues at module level sys.path.insert(0, str(SCRIPT_DIR)) from build_nid_viewer import build_html json_path = find_latest_json() if json_path: with json_path.open() as f: data = json.load(f) else: data = {} # Merge any available neighbor data into the viewer data with neighbor_lock: if latest_neighbor: nd = {} for ip, npath in latest_neighbor.items(): if npath and npath.is_file(): try: with npath.open() as f: nd[ip] = json.load(f) except Exception: pass if nd: data["neighbor_data"] = nd html = build_html(data) self._send(200, "text/html; charset=utf-8", html.encode()) def _serve_sse(self): """Stream walk status as Server-Sent Events.""" self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-store") self.send_header("Connection", "keep-alive") self.end_headers() last_version = -1 try: while True: status, version = get_status() if version != last_version: msg = f"data: {json.dumps(status)}\n\n" self.wfile.write(msg.encode()) self.wfile.flush() last_version = version # Close SSE after terminal states so client triggers reload if status["state"] in ("complete", "error"): break time.sleep(0.3) except (BrokenPipeError, ConnectionResetError): pass # ── POST ─────────────────────────────────────────────────────── def do_POST(self): if self.path == "/api/walk": self._handle_walk() elif self.path == "/api/ping": self._handle_ping() elif self.path == "/api/clear": self._handle_clear() elif self.path == "/api/neighbor-walk": self._handle_neighbor_walk() else: self._send(404, "text/plain", b"Not found") def _handle_ping(self): """Ping a target IP to check reachability.""" length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length else {} target = body.get("target", "").strip() if not target: self._send_json(400, {"error": "target IP required"}) return ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$") if not ip_re.match(target): self._send_json(400, {"error": f"Invalid IP address: {target}"}) return try: result = subprocess.run( ["ping", "-c", "1", "-W", "2", target], capture_output=True, text=True, timeout=5, ) reachable = result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): reachable = False self._send_json(200, {"reachable": reachable, "target": target}) def _handle_walk(self): """Start a walk in a background thread.""" current, _ = get_status() if current["state"] in ("walking", "parsing"): self._send_json(409, {"error": "Walk already in progress"}) return length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length else {} target = body.get("target", "").strip() mode = body.get("mode", "targeted").strip() policies = body.get("policies", True) if not target: self._send_json(400, {"error": "target IP required"}) return if mode not in ("targeted", "full"): self._send_json(400, {"error": "mode must be 'targeted' or 'full'"}) return set_status("walking", message="Starting walk...", progress=1) thread = threading.Thread(target=run_walk, args=(target, mode, policies), daemon=True) thread.start() self._send_json(200, {"status": "started", "target": target, "mode": mode}) def _handle_neighbor_walk(self): """Start a neighbor device walk in a background thread.""" length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length else {} target = body.get("target", "").strip() rem_port_id = body.get("remPortId", "").strip() rem_sys_name = body.get("remSysName", "").strip() if not target or not rem_port_id: self._send_json(400, {"error": "target and remPortId required"}) return ip_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$") if not ip_re.match(target): self._send_json(400, {"error": f"Invalid IP address: {target}"}) return # Check if already walking this neighbor with neighbor_lock: ns = neighbor_status.get(target, {}) if ns.get("state") == "walking": self._send_json(409, {"error": f"Already walking {target}"}) return thread = threading.Thread( target=run_neighbor_walk, args=(target, rem_port_id, rem_sys_name), daemon=True, ) thread.start() self._send_json(200, {"status": "started", "target": target}) def _handle_neighbor_data(self): """Return the latest neighbor monitoring JSON for a given target IP.""" # Parse ?target=x.x.x.x from query string from urllib.parse import urlparse, parse_qs qs = parse_qs(urlparse(self.path).query) target = qs.get("target", [None])[0] if not target: self._send_json(400, {"error": "target query param required"}) return with neighbor_lock: json_path = latest_neighbor.get(target) if not json_path or not json_path.is_file(): self._send_json(404, {"error": f"No neighbor data for {target}"}) return with json_path.open() as f: data = json.load(f) self._send_json(200, data) def _handle_neighbor_status(self): """Return the current walk status for a neighbor target.""" from urllib.parse import urlparse, parse_qs qs = parse_qs(urlparse(self.path).query) target = qs.get("target", [None])[0] if not target: self._send_json(400, {"error": "target query param required"}) return with neighbor_lock: status = neighbor_status.get(target, {"state": "idle", "message": ""}) self._send_json(200, status) def _handle_clear(self): """Move all walk data to walks/archive/ and reset state.""" global latest_json archive = WALKS_DIR / "archive" archive.mkdir(parents=True, exist_ok=True) moved = 0 for f in WALKS_DIR.iterdir(): if f.is_file(): f.rename(archive / f.name) moved += 1 latest_json = None set_status("idle") self._send_json(200, {"status": "cleared", "files_archived": moved}) # ── Main ───────────────────────────────────────────────────────────── def main(): server = ThreadingHTTPServer(("0.0.0.0", SERVER_PORT), NIDHandler) print(f"╔═══════════════════════════════════════════════════╗") print(f"║ SNMP NID Viewer — Web Server ║") print(f"║ http://localhost:{SERVER_PORT:<5} ║") print(f"╚═══════════════════════════════════════════════════╝") print(f" SNMPv{SNMP_VERSION} | Press Ctrl+C to stop") print() try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down.") server.shutdown() if __name__ == "__main__": main()