diff --git a/collectors/network_collector.py b/collectors/network_collector.py index bd5dec3..23c07ed 100644 --- a/collectors/network_collector.py +++ b/collectors/network_collector.py @@ -77,6 +77,7 @@ NETMIKO_ONLY_DRIVERS = { "brocade_nos": "extreme_nos", "brocade_vdx": "extreme_nos", "extreme_nos": "extreme_nos", + "iosxr": "cisco_xr", } # --------------------------------------------------------------------------- @@ -110,6 +111,7 @@ NAME_TO_TYPE = { r"^(Mg|MgmtEth|Management)": "1000base-t", r"^(Nu|Null)": "virtual", r"^(Po|Port-channel|port-channel)": "lag", + r"^(BE|Bundle-Ether)": "lag", r"^(BV|BVI)": "bridge", r"^(Se|Serial)": "other", # Brocade ICX / FastIron patterns @@ -475,12 +477,14 @@ def collect_netmiko_data(conn: ConnectHandler, driver: str) -> dict: def _netmiko_parse_facts(conn: ConnectHandler, driver: str) -> dict: """Parse device facts from Netmiko CLI output. - Handles both ICX/FastIron and NOS/VDX output formats. + Handles ICX/FastIron, NOS/VDX, and IOS-XR output formats. """ + is_iosxr = driver in ("iosxr", "cisco_xr") + vendor = "Cisco" if is_iosxr else "Brocade" facts = { "hostname": "", "model": "Unknown", - "vendor": "Brocade", + "vendor": vendor, "serial_number": "", "os_version": "", "fqdn": "", @@ -488,6 +492,9 @@ def _netmiko_parse_facts(conn: ConnectHandler, driver: str) -> dict: "interface_list": [], } + if is_iosxr: + return _netmiko_parse_facts_iosxr(conn, facts) + try: # Hostname from prompt (works for both ICX and NOS) # ICX: "SSH@Brocade40G-01#" → "Brocade40G-01" @@ -603,13 +610,163 @@ def _netmiko_parse_facts(conn: ConnectHandler, driver: str) -> dict: return facts +def _netmiko_parse_facts_iosxr(conn: ConnectHandler, facts: dict) -> dict: + """Parse device facts from IOS-XR CLI output via Netmiko.""" + try: + # Hostname from prompt: "RP/0/RP0/CPU0:CML-R9K-CORE-01#" → "CML-R9K-CORE-01" + prompt = conn.find_prompt().strip().rstrip("#>") + # Strip IOS-XR route-processor prefix + if ":" in prompt: + prompt = prompt.rsplit(":", 1)[-1] + if prompt: + facts["hostname"] = prompt + + # 'show version' on IOS-XR + ver_out = conn.send_command("show version") + for line in ver_out.splitlines(): + stripped = line.strip() + low = stripped.lower() + # "cisco IOS XR Software, Version 7.3.2" + if "ios xr software" in low and "version" in low: + ver_match = re.search(r"Version\s+(\S+)", stripped) + if ver_match: + facts["os_version"] = ver_match.group(1) + # "cisco IOS-XRv 9000 () processor" or "cisco 8201-32FH (Intel i686)" + elif "processor" in low and low.startswith("cisco"): + # Extract model: strip "cisco ", strip everything from "(" onward, strip "processor" + model = re.sub(r"^cisco\s+", "", stripped, flags=re.IGNORECASE) + model = re.sub(r"\s*\(.*?\)\s*", " ", model).strip() + model = re.sub(r"\s*processor\s*$", "", model, flags=re.IGNORECASE).strip() + if model: + facts["model"] = model + + # 'show inventory' for serial number and chassis PID + try: + inv_out = conn.send_command("show inventory") + current_name = "" + for line in inv_out.splitlines(): + stripped = line.strip() + # NAME: "Rack 0", DESCR: "..." + name_match = re.search(r'NAME:\s*"([^"]*)"', stripped) + if name_match: + current_name = name_match.group(1) + # PID: ..., VID: ..., SN: ... + if stripped.upper().startswith("PID:"): + pid_match = re.search(r"PID:\s*(\S+)", stripped) + sn_match = re.search(r"SN:\s*(\S+)", stripped) + # Prefer chassis-level (Rack 0) serial + if sn_match: + sn = sn_match.group(1) + if sn and sn != "N/A": + if current_name == "Rack 0" or not facts["serial_number"]: + facts["serial_number"] = sn + if pid_match and facts["model"] == "Unknown": + facts["model"] = pid_match.group(1) + except Exception: + pass + + # Hostname from running config (more reliable) + try: + host_out = conn.send_command("show running-config hostname") + for line in host_out.splitlines(): + stripped = line.strip() + if stripped.lower().startswith("hostname "): + facts["hostname"] = stripped.split(None, 1)[1].strip() + except Exception: + pass + + # Interface list from 'show ip interface brief' + try: + iface_out = conn.send_command("show ip interface brief") + for line in iface_out.splitlines(): + parts = line.split() + if not parts: + continue + name = parts[0] + # Match IOS-XR interface names + if re.match( + r"^(GigabitEthernet|TenGigE|HundredGigE|Bundle-Ether|Loopback|" + r"MgmtEth|Null|tunnel-ip|BVI)\d", + name, + ): + facts["interface_list"].append(name) + except Exception: + pass + + except Exception as exc: + log.error(" IOS-XR facts parsing failed: %s", exc) + return {} + + return facts + + +def _netmiko_parse_interfaces_iosxr(conn: ConnectHandler) -> dict: + """Parse IOS-XR interface data from 'show interfaces' output.""" + interfaces = {} + try: + output = conn.send_command("show interfaces", read_timeout=120) + current_iface = None + for line in output.splitlines(): + # IOS-XR header: "GigabitEthernet0/0/0/0 is up, line protocol is up" + # or "Bundle-Ether100 is up, line protocol is up" + # or "Loopback0 is up, line protocol is up" + # or "MgmtEth0/RP0/CPU0/0 is administratively down, line protocol is ..." + hdr = re.match( + r"^(\S+)\s+is\s+(up|down|administratively down)", + line, + ) + if hdr: + current_iface = hdr.group(1) + link_state = hdr.group(2) + proto_up = "line protocol is up" in line.lower() + interfaces[current_iface] = { + "is_up": proto_up, + "is_enabled": link_state != "administratively down", + "description": "", + "mac_address": "", + "speed": 0, + "mtu": 0, + "last_flapped": -1.0, + } + continue + + if current_iface and current_iface in interfaces: + stripped = line.strip() + low = stripped.lower() + # " Description: To-CORE-02" + if low.startswith("description:"): + interfaces[current_iface]["description"] = stripped.split(":", 1)[1].strip()[:200] + # " Hardware is GigabitEthernet, address is 5254.0012.000f" + # or " address is 5254.0012.000f" + elif "address is" in low and "internet" not in low: + mac_match = re.search(r"address is\s+([0-9a-fA-F.:]+)", stripped) + if mac_match: + interfaces[current_iface]["mac_address"] = mac_match.group(1) + # " MTU 1514 bytes, BW 1000000 Kbit/sec" + elif "mtu" in low and "bytes" in low: + mtu_match = re.search(r"MTU\s+(\d+)", stripped) + bw_match = re.search(r"BW\s+(\d+)\s*Kbit", stripped) + if mtu_match: + interfaces[current_iface]["mtu"] = int(mtu_match.group(1)) + if bw_match: + # BW is in Kbit/sec, convert to Mbps + interfaces[current_iface]["speed"] = int(bw_match.group(1)) // 1000 + except Exception as exc: + log.warning(" IOS-XR interface parsing failed: %s", exc) + + return interfaces + + def _netmiko_parse_interfaces(conn: ConnectHandler, driver: str) -> dict: """Parse interface data from CLI output. - Handles both ICX/FastIron and NOS/VDX formats. + Handles ICX/FastIron, NOS/VDX, and IOS-XR formats. Returns dict matching NAPALM get_interfaces() format: {iface_name: {is_up, is_enabled, description, mac_address, speed, mtu, last_flapped}} """ + if driver in ("iosxr", "cisco_xr"): + return _netmiko_parse_interfaces_iosxr(conn) + interfaces = {} is_icx = driver in ("ruckus_fastiron",) @@ -751,13 +908,102 @@ def _netmiko_parse_interfaces(conn: ConnectHandler, driver: str) -> dict: return interfaces +def _netmiko_parse_interfaces_ip_iosxr(conn: ConnectHandler) -> dict: + """Parse IOS-XR IP addresses from 'show ipv4 interface brief' and running config.""" + interfaces_ip: dict[str, dict] = {} + + # 'show ipv4 interface brief' on IOS-XR: + # Interface IP-Address Status Protocol Vrf-Name + # Bundle-Ether100 10.0.100.1 Up Up default + # GigabitEthernet0/0/0/0 unassigned Up Up default + # Loopback0 10.255.0.1 Up Up default + try: + output = conn.send_command("show ipv4 interface brief") + for line in output.splitlines(): + parts = line.split() + if len(parts) < 2: + continue + name = parts[0] + ip = parts[1] + # Skip header and unassigned + if ip.lower() in ("ip-address", "unassigned") or not re.match(r"\d+\.\d+\.\d+\.\d+", ip): + continue + interfaces_ip.setdefault(name, {"ipv4": {}, "ipv6": {}}) + interfaces_ip[name]["ipv4"][ip] = {"prefix_length": 24} # refined below + except Exception as exc: + log.debug(" 'show ipv4 interface brief' failed: %s", exc) + + # Get accurate prefix lengths from running config + try: + output = conn.send_command("show running-config interface", read_timeout=120) + current_iface = None + for line in output.splitlines(): + stripped = line.strip() + # "interface Bundle-Ether100" or "interface Loopback0" + iface_match = re.match(r"^interface\s+(\S+)", stripped, re.IGNORECASE) + if iface_match: + current_iface = iface_match.group(1) + continue + if stripped == "!": + current_iface = None + continue + if current_iface and "ipv4 address" in stripped.lower(): + # "ipv4 address 10.0.100.1 255.255.255.0" or "ipv4 address 10.0.100.1/24" + ip_match = re.search( + r"ipv4 address\s+(\d+\.\d+\.\d+\.\d+)[/\s]+(\d+\.[\d.]+|\d+)", + stripped, re.IGNORECASE, + ) + if ip_match: + ip = ip_match.group(1) + mask_or_prefix = ip_match.group(2) + if "." in mask_or_prefix: + prefix = sum(bin(int(x)).count("1") for x in mask_or_prefix.split(".")) + else: + prefix = int(mask_or_prefix) + interfaces_ip.setdefault(current_iface, {"ipv4": {}, "ipv6": {}}) + interfaces_ip[current_iface]["ipv4"][ip] = {"prefix_length": prefix} + except Exception as exc: + log.debug(" IOS-XR running-config IP parsing failed: %s", exc) + + # IPv6 + try: + output = conn.send_command("show ipv6 interface brief") + for line in output.splitlines(): + parts = line.split() + if not parts: + continue + # IOS-XR: "Bundle-Ether100 [Up/Up]" + # " fe80::... " + # " 2001:db8::1/64" + if not line.startswith(" ") and len(parts) >= 1: + iface_match = re.match(r"^(\S+)", line) + if iface_match: + current_iface = iface_match.group(1) + elif line.startswith(" "): + ipv6_match = re.search(r"([0-9a-fA-F:]+(?:::[0-9a-fA-F:]*)?)/(\d+)", line) + if ipv6_match and current_iface: + addr = ipv6_match.group(1) + prefix = int(ipv6_match.group(2)) + if addr.lower().startswith("fe80"): + continue # skip link-local + interfaces_ip.setdefault(current_iface, {"ipv4": {}, "ipv6": {}}) + interfaces_ip[current_iface]["ipv6"][addr] = {"prefix_length": prefix} + except Exception as exc: + log.debug(" IOS-XR IPv6 parsing failed: %s", exc) + + return interfaces_ip + + def _netmiko_parse_interfaces_ip(conn: ConnectHandler, driver: str) -> dict: """Parse IP addresses from CLI output. - Handles both ICX and NOS formats. + Handles ICX, NOS, and IOS-XR formats. Returns dict matching NAPALM get_interfaces_ip() format: {iface_name: {ipv4: {addr: {prefix_length: N}}, ipv6: {addr: {prefix_length: N}}}} """ + if driver in ("iosxr", "cisco_xr"): + return _netmiko_parse_interfaces_ip_iosxr(conn) + interfaces_ip: dict[str, dict] = {} # 'show ip interface' — works on both ICX and NOS