diff --git a/collectors/__init__.py b/collectors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/collectors/cml_collector.py b/collectors/cml_collector.py new file mode 100644 index 0000000..faf956b --- /dev/null +++ b/collectors/cml_collector.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python3 +"""Cisco Modeling Labs collector for NetBox via Diode SDK. + +Syncs CML lab topology into NetBox: devices (nodes), interfaces, cables (links), +L3 addresses, and device configs. + +Usage: + python collectors/cml_collector.py --dry-run + python collectors/cml_collector.py +""" + +import argparse +import logging +import os +import re +import sys + +from virl2_client import ClientLibrary + +from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient +from netboxlabs.diode.sdk.ingester import ( + Cable, + Device, + DeviceConfig, + DeviceRole, + DeviceType, + Entity, + GenericObject, + Interface, + IPAddress, + Manufacturer, + Platform, + Site, +) + +log = logging.getLogger("cml-collector") + +# --------------------------------------------------------------------------- +# CML node definition → NetBox mappings +# --------------------------------------------------------------------------- + +NODE_DEF_TO_PLATFORM = { + "iosv": "Cisco IOS", + "iosvl2": "Cisco IOS", + "iosxrv": "Cisco IOS-XR", + "iosxrv9000": "Cisco IOS-XR", + "csr1000v": "Cisco IOS-XE", + "cat8000v": "Cisco IOS-XE", + "cat9000v": "Cisco IOS-XE", + "nxosv": "Cisco NX-OS", + "nxosv9000": "Cisco NX-OS", + "asav": "Cisco ASA", + "server": "Linux", + "ubuntu": "Ubuntu", + "alpine": "Alpine Linux", + "desktop": "Linux", + "trex": "TRex Traffic Generator", + "wan_emulator": "Linux", + "external_connector": "External Connector", + "unmanaged_switch": "Unmanaged Switch", +} + +NODE_DEF_TO_MANUFACTURER = { + "iosv": "Cisco", + "iosvl2": "Cisco", + "iosxrv": "Cisco", + "iosxrv9000": "Cisco", + "csr1000v": "Cisco", + "cat8000v": "Cisco", + "cat9000v": "Cisco", + "nxosv": "Cisco", + "nxosv9000": "Cisco", + "asav": "Cisco", +} + +NODE_DEF_TO_ROLE = { + "iosv": "Router", + "iosvl2": "Switch", + "iosxrv": "Router", + "iosxrv9000": "Router", + "csr1000v": "Router", + "cat8000v": "Router", + "cat9000v": "Switch", + "nxosv": "Switch", + "nxosv9000": "Switch", + "asav": "Firewall", + "server": "Server", + "ubuntu": "Server", + "alpine": "Server", + "desktop": "Server", + "external_connector": "Patch Panel", + "unmanaged_switch": "Switch", +} + +NODE_DEF_TO_MODEL = { + "iosv": "IOSv", + "iosvl2": "IOSvL2", + "iosxrv": "IOS-XRv", + "iosxrv9000": "IOS-XRv 9000", + "csr1000v": "CSR1000v", + "cat8000v": "Catalyst 8000V", + "cat9000v": "Catalyst 9000V", + "nxosv": "NX-OSv", + "nxosv9000": "NX-OSv 9000", + "asav": "ASAv", +} + +CML_STATE_TO_STATUS = { + "BOOTED": "active", + "STARTED": "active", + "STOPPED": "offline", + "DEFINED_ON_CORE": "planned", + "QUEUED": "planned", +} + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + + +def load_dotenv(path: str = ".env") -> None: + if not os.path.isfile(path): + return + with open(path) as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + os.environ.setdefault(key.strip(), val.strip().strip("\"'")) + + +def get_config() -> dict: + return { + "host": os.environ.get("CML_HOST", ""), + "user": os.environ.get("CML_USER", "admin"), + "password": os.environ.get("CML_PASSWORD", ""), + "lab": os.environ.get("CML_LAB", ""), + "verify_ssl": os.environ.get("CML_VERIFY_SSL", "false").lower() == "true", + "site": os.environ.get("CML_SITE", "CML"), + } + + +# --------------------------------------------------------------------------- +# Device reference helper +# --------------------------------------------------------------------------- + + +def _device_ref(name: str, node_def: str, site_name: str) -> Device: + model = NODE_DEF_TO_MODEL.get(node_def, node_def) + manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic") + role = NODE_DEF_TO_ROLE.get(node_def, "Network Device") + return Device( + name=name, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=manufacturer), + ), + role=DeviceRole(name=role), + site=Site(name=site_name), + ) + + +# --------------------------------------------------------------------------- +# Entity builders +# --------------------------------------------------------------------------- + + +def build_node_entity(node, site_name: str) -> Entity: + node_def = node.node_definition or "unknown" + model = NODE_DEF_TO_MODEL.get(node_def, node_def) + manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic") + role = NODE_DEF_TO_ROLE.get(node_def, "Network Device") + platform = NODE_DEF_TO_PLATFORM.get(node_def, node_def) + status = CML_STATE_TO_STATUS.get(node.state, "planned") + + return Entity(device=Device( + name=node.label, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=manufacturer), + ), + role=DeviceRole(name=role), + platform=Platform(name=platform), + site=Site(name=site_name), + status=status, + tags=["cml"], + )) + + +def build_interface_entities(node, site_name: str) -> list[Entity]: + entities = [] + node_def = node.node_definition or "unknown" + + for iface in node.interfaces(): + iface_name = iface.label or f"iface{iface.slot}" + # Map interface type from name + iface_type = "virtual" + if re.match(r"^(Gi|GigabitEthernet)", iface_name, re.IGNORECASE): + iface_type = "1000base-t" + elif re.match(r"^(Te|TenGig)", iface_name, re.IGNORECASE): + iface_type = "10gbase-x-sfpp" + elif re.match(r"^(Fa|FastEthernet)", iface_name, re.IGNORECASE): + iface_type = "100base-tx" + elif re.match(r"^(Lo|Loopback)", iface_name, re.IGNORECASE): + iface_type = "virtual" + elif re.match(r"^(eth|ens|enp)", iface_name, re.IGNORECASE): + iface_type = "1000base-t" + + entities.append(Entity(interface=Interface( + device=_device_ref(node.label, node_def, site_name), + name=iface_name, + type=iface_type, + enabled=node.state in ("BOOTED", "STARTED"), + tags=["cml"], + ))) + + return entities + + +def build_ip_entities(node, site_name: str) -> list[Entity]: + """Build IP entities from CML node L3 addresses.""" + entities = [] + node_def = node.node_definition or "unknown" + + for iface in node.interfaces(): + iface_name = iface.label or f"iface{iface.slot}" + + # CML may expose discovered L3 addresses + try: + if hasattr(iface, "discovered_ipv4") and iface.discovered_ipv4: + for addr in iface.discovered_ipv4: + if addr and not addr.startswith("127."): + ip_str = addr if "/" in addr else f"{addr}/24" + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_interface=Interface( + device=_device_ref(node.label, node_def, site_name), + name=iface_name, + type="virtual", + ), + tags=["cml"], + ))) + if hasattr(iface, "discovered_ipv6") and iface.discovered_ipv6: + for addr in iface.discovered_ipv6: + if addr and not addr.lower().startswith("fe80"): + ip_str = addr if "/" in addr else f"{addr}/64" + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_interface=Interface( + device=_device_ref(node.label, node_def, site_name), + name=iface_name, + type="virtual", + ), + tags=["cml"], + ))) + except Exception as exc: + log.debug(" IP discovery unavailable for %s:%s: %s", + node.label, iface_name, exc) + + return entities + + +def build_cable_entities(lab, site_name: str, + node_map: dict[str, str]) -> list[Entity]: + """Build Cable entities from CML links. + + node_map: {node_id: (node_label, node_definition)} + """ + entities = [] + + for link in lab.links(): + try: + iface_a = link.interface_a + iface_b = link.interface_b + node_a = iface_a.node + node_b = iface_b.node + + a_name = iface_a.label or f"iface{iface_a.slot}" + b_name = iface_b.label or f"iface{iface_b.slot}" + a_node_def = node_a.node_definition or "unknown" + b_node_def = node_b.node_definition or "unknown" + + cable = Cable( + a_terminations=[GenericObject(object_interface=Interface( + device=_device_ref(node_a.label, a_node_def, site_name), + name=a_name, + type="virtual", + ))], + b_terminations=[GenericObject(object_interface=Interface( + device=_device_ref(node_b.label, b_node_def, site_name), + name=b_name, + type="virtual", + ))], + status="connected", + tags=["cml"], + ) + entities.append(Entity(cable=cable)) + log.info(" Cable: %s:%s <-> %s:%s", + node_a.label, a_name, node_b.label, b_name) + except Exception as exc: + log.warning(" Failed to build cable for link: %s", exc) + + return entities + + +def build_config_entity(node, site_name: str) -> Entity | None: + """Build DeviceConfig entity from CML node configuration.""" + node_def = node.node_definition or "unknown" + try: + config = node.config + if not config: + return None + return Entity( + device=_device_ref(node.label, node_def, site_name), + device_config=DeviceConfig( + startup=config.encode("utf-8") if config else None, + ), + ) + except Exception as exc: + log.debug(" Config unavailable for %s: %s", node.label, exc) + return None + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def collect_all_entities(cfg: dict) -> list[Entity]: + host = cfg["host"] + if not host: + log.error("CML_HOST not set") + sys.exit(1) + + url = f"https://{host}" if not host.startswith("http") else host + log.info("Connecting to CML at %s...", url) + + client = ClientLibrary(url, cfg["user"], cfg["password"], + ssl_verify=cfg["verify_ssl"]) + + site_name = cfg["site"] + entities: list[Entity] = [] + + # Get labs + labs = client.all_labs() + target_lab = cfg.get("lab") + + if target_lab: + labs = [l for l in labs if l.title == target_lab or l.id == target_lab] + if not labs: + log.error("Lab '%s' not found", target_lab) + sys.exit(1) + + for lab in labs: + lab.sync() + log.info("Lab: %s (%s) — %d nodes, %d links", + lab.title, lab.state(), len(lab.nodes()), len(lab.links())) + + node_map = {} + for node in lab.nodes(): + node_def = node.node_definition or "unknown" + node_map[node.id] = (node.label, node_def) + + # Skip external connectors and unmanaged switches for device creation + if node_def in ("external_connector", "unmanaged_switch"): + log.debug(" Skipping non-device node: %s (%s)", node.label, node_def) + # Still create interface entities for cable termination + entities.extend(build_interface_entities(node, site_name)) + continue + + # Device + entities.append(build_node_entity(node, site_name)) + + # Interfaces + entities.extend(build_interface_entities(node, site_name)) + + # IPs + entities.extend(build_ip_entities(node, site_name)) + + # Config + config_entity = build_config_entity(node, site_name) + if config_entity: + entities.append(config_entity) + + # Cables from links + entities.extend(build_cable_entities(lab, site_name, node_map)) + + return entities + + +def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: + if not entities: + log.warning("No entities to ingest") + return + + target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") + client_id = os.environ.get("DIODE_CLIENT_ID", + os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) + client_secret = os.environ.get("DIODE_CLIENT_SECRET", + os.environ.get("INGESTER_CLIENT_SECRET", "")) + + if dry_run: + log.info("DRY RUN: %d entities would be ingested", len(entities)) + for i, e in enumerate(entities): + log.info(" [%d] %s", i, e) + return + + if not client_secret: + log.error("DIODE_CLIENT_SECRET not set — cannot ingest") + sys.exit(1) + + log.info("Ingesting %d entities to %s ...", len(entities), target) + + from netboxlabs.diode.sdk.ingester import create_message_chunks + + with DiodeClient( + target=target, + client_id=client_id, + client_secret=client_secret, + app_name="cml-collector", + app_version="0.1.0", + ) as client: + chunks = create_message_chunks(entities) + for idx, chunk in enumerate(chunks): + resp = client.ingest(entities=chunk) + if resp.errors: + log.error("Chunk %d errors: %s", idx, resp.errors) + else: + log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + + +def main(): + parser = argparse.ArgumentParser(description="CML topology collector for NetBox") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"]) + parser.add_argument("--env-file", default=".env") + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + + load_dotenv(args.env_file) + cfg = get_config() + + entities = collect_all_entities(cfg) + log.info("Total entities: %d", len(entities)) + + ingest_entities(entities, dry_run=args.dry_run) + log.info("Done!") + + +if __name__ == "__main__": + main() diff --git a/collectors/docker_collector.py b/collectors/docker_collector.py new file mode 100644 index 0000000..55bd38e --- /dev/null +++ b/collectors/docker_collector.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +"""Docker container collector for NetBox via Diode SDK. + +Discovers Docker containers across one or more hosts and ingests them +into NetBox as VirtualMachines with VMInterfaces and IPAddresses. + +Usage: + python collectors/docker_collector.py --dry-run + python collectors/docker_collector.py +""" + +import argparse +import logging +import os +import sys + +import docker + +from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient +from netboxlabs.diode.sdk.ingester import ( + Cluster, + ClusterType, + CustomFieldValue, + Device, + DeviceRole, + DeviceType, + Entity, + IPAddress, + Manufacturer, + Platform, + Site, + VirtualMachine, + VMInterface, +) + +log = logging.getLogger("docker-collector") + +# --------------------------------------------------------------------------- +# Status mappings +# --------------------------------------------------------------------------- + +CONTAINER_STATUS_MAP = { + "running": "active", + "paused": "active", + "restarting": "active", + "created": "planned", + "exited": "offline", + "dead": "failed", + "removing": "decommissioning", +} + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + + +def load_dotenv(path: str = ".env") -> None: + if not os.path.isfile(path): + return + with open(path) as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + os.environ.setdefault(key.strip(), val.strip().strip("\"'")) + + +def get_config() -> dict: + # Support multiple Docker hosts (comma-separated) + hosts_str = os.environ.get("DOCKER_HOSTS", "") + if hosts_str: + hosts = [h.strip() for h in hosts_str.split(",") if h.strip()] + else: + hosts = ["local"] # Use local Docker socket + + return { + "hosts": hosts, + "site": os.environ.get("DOCKER_SITE", "main"), + "tls_verify": os.environ.get("DOCKER_TLS_VERIFY", "false").lower() == "true", + } + + +# --------------------------------------------------------------------------- +# VM reference helper +# --------------------------------------------------------------------------- + + +def _vm_ref(name: str, cluster_name: str, site_name: str) -> VirtualMachine: + return VirtualMachine( + name=name, + site=Site(name=site_name), + cluster=Cluster( + name=cluster_name, + type=ClusterType(name="Docker"), + scope_site=Site(name=site_name), + ), + role=DeviceRole(name="Docker Container"), + ) + + +# --------------------------------------------------------------------------- +# Data collection +# --------------------------------------------------------------------------- + + +def connect_docker(host: str, tls_verify: bool = False) -> docker.DockerClient: + """Connect to a Docker host.""" + if host == "local": + return docker.from_env() + else: + tls_config = None + if host.startswith("https://") and tls_verify: + tls_config = docker.tls.TLSConfig(verify=True) + return docker.DockerClient(base_url=host, tls=tls_config) + + +def get_host_info(client: docker.DockerClient) -> dict: + """Get Docker host system info.""" + return client.info() + + +def get_containers(client: docker.DockerClient, all_containers: bool = True) -> list: + """Get all containers from Docker host.""" + return client.containers.list(all=all_containers) + + +# --------------------------------------------------------------------------- +# Entity builders +# --------------------------------------------------------------------------- + + +def build_cluster_entity(host_name: str, site_name: str) -> Entity: + """Build a Cluster entity for the Docker host.""" + return Entity(cluster=Cluster( + name=host_name, + type=ClusterType(name="Docker"), + scope_site=Site(name=site_name), + status="active", + tags=["docker"], + )) + + +def build_container_entities(container, host_name: str, + site_name: str) -> list[Entity]: + """Build VirtualMachine + VMInterface + IPAddress entities for a container.""" + entities = [] + + # Container info + name = container.name + status = CONTAINER_STATUS_MAP.get(container.status, "active") + image = container.image.tags[0] if container.image.tags else str(container.image.id)[:20] + short_id = container.short_id + + # Labels/env for metadata + labels = container.labels or {} + compose_project = labels.get("com.docker.compose.project", "") + compose_service = labels.get("com.docker.compose.service", "") + + # Custom fields + custom_fields = { + "docker_container_id": CustomFieldValue(text=short_id), + } + if compose_project: + custom_fields["docker_compose_project"] = CustomFieldValue(text=compose_project) + + # VirtualMachine entity + vm_kwargs = dict( + name=name, + status=status, + site=Site(name=site_name), + cluster=Cluster( + name=host_name, + type=ClusterType(name="Docker"), + scope_site=Site(name=site_name), + ), + role=DeviceRole(name="Docker Container"), + platform=Platform(name="Docker"), + comments=f"Image: {image}", + tags=["docker"], + custom_fields=custom_fields, + ) + + entities.append(Entity(virtual_machine=VirtualMachine(**vm_kwargs))) + + # Network interfaces and IPs + vm_ref = _vm_ref(name, host_name, site_name) + + try: + # container.attrs has full inspect data + networks = container.attrs.get("NetworkSettings", {}).get("Networks", {}) + for net_name, net_data in networks.items(): + ip = net_data.get("IPAddress", "") + mac = net_data.get("MacAddress", "") + gateway = net_data.get("Gateway", "") + prefix_len = net_data.get("IPPrefixLen", 0) + + ipv6 = net_data.get("GlobalIPv6Address", "") + ipv6_prefix = net_data.get("GlobalIPv6PrefixLen", 0) + + # VMInterface + iface_name = net_name[:64] + iface_kwargs = dict( + virtual_machine=vm_ref, + name=iface_name, + enabled=True, + tags=["docker"], + ) + if mac: + iface_kwargs["mac_address"] = mac + entities.append(Entity(vm_interface=VMInterface(**iface_kwargs))) + + # IPv4 address + if ip and ip != "0.0.0.0": + ip_str = f"{ip}/{prefix_len}" if prefix_len else f"{ip}/24" + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_vm_interface=VMInterface( + virtual_machine=vm_ref, + name=iface_name, + ), + tags=["docker"], + ))) + + # IPv6 address + if ipv6: + ipv6_str = f"{ipv6}/{ipv6_prefix}" if ipv6_prefix else f"{ipv6}/64" + entities.append(Entity(ip_address=IPAddress( + address=ipv6_str, + status="active", + assigned_object_vm_interface=VMInterface( + virtual_machine=vm_ref, + name=iface_name, + ), + tags=["docker"], + ))) + + except Exception as exc: + log.warning(" Network info unavailable for %s: %s", name, exc) + + return entities + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def collect_all_entities(cfg: dict) -> list[Entity]: + site_name = cfg["site"] + entities: list[Entity] = [] + + for host in cfg["hosts"]: + log.info("Connecting to Docker host: %s", host) + try: + client = connect_docker(host, cfg["tls_verify"]) + except Exception as exc: + log.error("Failed to connect to Docker host %s: %s", host, exc) + continue + + # Get host info for cluster name + try: + info = get_host_info(client) + host_name = info.get("Name", host) + log.info(" Host: %s (Docker %s, %d containers)", + host_name, info.get("ServerVersion", "?"), + info.get("Containers", 0)) + except Exception as exc: + log.warning(" Cannot get host info: %s", exc) + host_name = host + + # Cluster entity + entities.append(build_cluster_entity(host_name, site_name)) + + # Container entities + try: + containers = get_containers(client) + log.info(" Found %d containers", len(containers)) + for container in containers: + try: + entities.extend(build_container_entities( + container, host_name, site_name + )) + except Exception as exc: + log.error(" Failed to process container %s: %s", + container.name, exc) + except Exception as exc: + log.error(" Failed to list containers on %s: %s", host_name, exc) + + return entities + + +def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: + if not entities: + log.warning("No entities to ingest") + return + + target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") + client_id = os.environ.get("DIODE_CLIENT_ID", + os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) + client_secret = os.environ.get("DIODE_CLIENT_SECRET", + os.environ.get("INGESTER_CLIENT_SECRET", "")) + + if dry_run: + log.info("DRY RUN: %d entities would be ingested", len(entities)) + for i, e in enumerate(entities): + log.info(" [%d] %s", i, e) + return + + if not client_secret: + log.error("DIODE_CLIENT_SECRET not set — cannot ingest") + sys.exit(1) + + log.info("Ingesting %d entities to %s ...", len(entities), target) + + from netboxlabs.diode.sdk.ingester import create_message_chunks + + with DiodeClient( + target=target, + client_id=client_id, + client_secret=client_secret, + app_name="docker-collector", + app_version="0.1.0", + ) as client: + chunks = create_message_chunks(entities) + for idx, chunk in enumerate(chunks): + resp = client.ingest(entities=chunk) + if resp.errors: + log.error("Chunk %d errors: %s", idx, resp.errors) + else: + log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + + +def main(): + parser = argparse.ArgumentParser(description="Docker container collector for NetBox") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--all", action="store_true", + help="Include stopped containers (default: running only)") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"]) + parser.add_argument("--env-file", default=".env") + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + + load_dotenv(args.env_file) + cfg = get_config() + + entities = collect_all_entities(cfg) + log.info("Total entities: %d", len(entities)) + + ingest_entities(entities, dry_run=args.dry_run) + log.info("Done!") + + +if __name__ == "__main__": + main() diff --git a/collectors/inventory.yaml.example b/collectors/inventory.yaml.example new file mode 100644 index 0000000..bb2718d --- /dev/null +++ b/collectors/inventory.yaml.example @@ -0,0 +1,36 @@ +# Network Device Inventory for network_collector.py +# +# Copy this file to inventory.yaml and fill in your device details. +# Fields in 'defaults' apply to all devices unless overridden per-device. + +defaults: + site: main + role: Network Device + username: admin + password: cisco + secret: cisco # enable secret (if required) + driver: ios # NAPALM driver: ios, iosxr, eos, junos, nxos + timeout: 60 + +devices: + # Cisco IOS routers + - host: 10.10.20.1 + driver: ios + role: Router + + # Cisco IOS switches + - host: 10.10.20.55 + driver: ios + role: Switch + + # Cisco IOS-XR devices + # - host: 10.10.20.100 + # driver: iosxr + # role: Router + + # Brocade switches (requires napalm-ruckus-fastiron or use netmiko fallback) + # - host: 10.10.20.200 + # driver: ros # or use custom driver name + # role: Switch + # optional_args: + # transport: ssh diff --git a/collectors/network_collector.py b/collectors/network_collector.py new file mode 100644 index 0000000..e95b11a --- /dev/null +++ b/collectors/network_collector.py @@ -0,0 +1,1046 @@ +#!/usr/bin/env python3 +"""Network device collector for NetBox via Diode SDK. + +Discovers Cisco and Brocade network devices via NAPALM (and optionally pyATS +for CDP/OSPF/IS-IS), then ingests devices, interfaces, IPs, cables, VLANs, +VRFs, prefixes, configs, and inventory into NetBox through the Diode pipeline. + +BGP sessions are pushed to the netbox-bgp plugin API (not via Diode). + +Usage: + python collectors/network_collector.py --inventory inventory.yaml + python collectors/network_collector.py --inventory inventory.yaml --dry-run +""" + +import argparse +import json +import logging +import os +import re +import sys +from typing import Any + +import yaml +from napalm import get_network_driver + +from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient +from netboxlabs.diode.sdk.ingester import ( + ASN, + Cable, + Device, + DeviceConfig, + DeviceRole, + DeviceType, + Entity, + GenericObject, + Interface, + InventoryItem, + InventoryItemRole, + IPAddress, + Manufacturer, + Platform, + Prefix, + Site, + VRF, + VLAN, + VLANGroup, +) + +# Optional: pyATS/Genie for Cisco-specific parsing (CDP, OSPF, IS-IS) +try: + from genie.conf.base import Device as GenieDevice + from genie.libs.parser.utils import common as genie_common + + HAS_PYATS = True +except ImportError: + HAS_PYATS = False + +# Optional: requests for pushing data to NetBox plugin APIs +try: + import requests + + HAS_REQUESTS = True +except ImportError: + HAS_REQUESTS = False + +log = logging.getLogger("network-collector") + +# --------------------------------------------------------------------------- +# Interface type mapping +# --------------------------------------------------------------------------- + +SPEED_TO_TYPE = { + 10: "10base-t", + 100: "100base-tx", + 1000: "1000base-t", + 2500: "2.5gbase-t", + 5000: "5gbase-t", + 10000: "10gbase-x-sfpp", + 25000: "25gbase-x-sfp28", + 40000: "40gbase-x-qsfpp", + 50000: "50gbase-x-sfp56", + 100000: "100gbase-x-qsfp28", +} + +NAME_TO_TYPE = { + r"^(Gi|GigabitEthernet)": "1000base-t", + r"^(Te|TenGigabitEthernet|TenGigE)": "10gbase-x-sfpp", + r"^(Tw|TwentyFiveGig)": "25gbase-x-sfp28", + r"^(Fo|FortyGig|FortyGigE)": "40gbase-x-qsfpp", + r"^(Hu|HundredGig|HundredGigE)": "100gbase-x-qsfp28", + r"^(Fa|FastEthernet)": "100base-tx", + r"^(Et|Ethernet)\d": "1000base-t", + r"^(Lo|Loopback)": "virtual", + r"^(Vl|Vlan)": "virtual", + r"^(Tu|Tunnel)": "virtual", + r"^(Mg|MgmtEth|Management)": "1000base-t", + r"^(Nu|Null)": "virtual", + r"^(Po|Port-channel|port-channel)": "lag", + r"^(BV|BVI)": "bridge", + r"^(Se|Serial)": "other", +} + +DRIVER_TO_PLATFORM = { + "ios": "Cisco IOS", + "iosxr": "Cisco IOS-XR", + "eos": "Arista EOS", + "junos": "Juniper Junos", + "nxos": "Cisco NX-OS", + "nxos_ssh": "Cisco NX-OS", +} + +DRIVER_TO_MANUFACTURER = { + "ios": "Cisco", + "iosxr": "Cisco", + "eos": "Arista", + "junos": "Juniper", + "nxos": "Cisco", + "nxos_ssh": "Cisco", +} + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + + +def load_dotenv(path: str = ".env") -> None: + """Minimal .env loader — no extra dependency.""" + if not os.path.isfile(path): + return + with open(path) as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + key = key.strip() + val = val.strip().strip("\"'") + os.environ.setdefault(key, val) + + +def load_inventory(path: str) -> dict: + """Load the device inventory from a YAML file. + + Expected format: + defaults: + site: main + role: Network Device + username: admin + password: cisco + secret: cisco # enable secret + driver: ios # NAPALM driver + timeout: 60 + + devices: + - host: 10.10.20.1 + driver: ios + role: Router + - host: 10.10.20.55 + driver: ios + role: Switch + - host: 10.10.20.100 + driver: iosxr + role: Router + """ + with open(path) as fh: + data = yaml.safe_load(fh) + if not data or "devices" not in data: + raise ValueError(f"Inventory file {path} must have a 'devices' list") + return data + + +def merge_device_config(device_entry: dict, defaults: dict) -> dict: + """Merge a single device entry with defaults (device overrides defaults).""" + merged = dict(defaults) + merged.update({k: v for k, v in device_entry.items() if v is not None}) + return merged + + +# --------------------------------------------------------------------------- +# Interface type helpers +# --------------------------------------------------------------------------- + + +def map_interface_type(name: str, speed: int = 0) -> str: + """Map an interface name and/or speed to a NetBox interface type.""" + for pattern, iface_type in NAME_TO_TYPE.items(): + if re.match(pattern, name, re.IGNORECASE): + return iface_type + if speed and speed in SPEED_TO_TYPE: + return SPEED_TO_TYPE[speed] + return "other" + + +def normalize_interface_name(name: str) -> str: + """Normalize short interface names from LLDP/CDP to long form for matching.""" + abbrevs = { + "Gi": "GigabitEthernet", + "Te": "TenGigabitEthernet", + "Fa": "FastEthernet", + "Et": "Ethernet", + "Fo": "FortyGigabitEthernet", + "Hu": "HundredGigE", + "Lo": "Loopback", + "Vl": "Vlan", + "Po": "Port-channel", + "Mg": "MgmtEth", + "Tu": "Tunnel", + "Se": "Serial", + } + for short, long in abbrevs.items(): + if name.startswith(short) and not name.startswith(long): + rest = name[len(short):] + if rest and (rest[0].isdigit() or rest[0] == "/"): + return long + rest + return name + + +# --------------------------------------------------------------------------- +# NAPALM data collection +# --------------------------------------------------------------------------- + + +def connect_device(host: str, driver: str, username: str, password: str, + secret: str = "", timeout: int = 60, + optional_args: dict | None = None) -> Any: + """Connect to a device via NAPALM and return the open driver.""" + driver_cls = get_network_driver(driver) + opts = optional_args or {} + if secret: + opts.setdefault("secret", secret) + opts.setdefault("transport", "ssh") + dev = driver_cls(host, username, password, timeout=timeout, optional_args=opts) + dev.open() + return dev + + +def collect_napalm_data(dev: Any) -> dict: + """Collect all available data from a NAPALM device.""" + data = {} + + # Facts (always available) + try: + data["facts"] = dev.get_facts() + log.info(" Facts: %s (%s)", data["facts"].get("hostname"), data["facts"].get("model")) + except Exception as exc: + log.error(" get_facts() failed: %s", exc) + return data + + # Interfaces + try: + data["interfaces"] = dev.get_interfaces() + log.debug(" Interfaces: %d", len(data["interfaces"])) + except Exception as exc: + log.warning(" get_interfaces() failed: %s", exc) + + # Interface IPs (IPv4 + IPv6) + try: + data["interfaces_ip"] = dev.get_interfaces_ip() + log.debug(" Interface IPs: %d interfaces with IPs", len(data["interfaces_ip"])) + except Exception as exc: + log.warning(" get_interfaces_ip() failed: %s", exc) + + # LLDP neighbors + try: + data["lldp_neighbors"] = dev.get_lldp_neighbors_detail() + total = sum(len(v) for v in data["lldp_neighbors"].values()) + log.debug(" LLDP neighbors: %d", total) + except Exception as exc: + log.debug(" get_lldp_neighbors_detail() failed: %s", exc) + + # Config + try: + data["config"] = dev.get_config() + log.debug(" Config: running=%d bytes", len(data["config"].get("running", ""))) + except Exception as exc: + log.warning(" get_config() failed: %s", exc) + + # VLANs + try: + data["vlans"] = dev.get_vlans() + log.debug(" VLANs: %d", len(data["vlans"])) + except Exception as exc: + log.debug(" get_vlans() unavailable: %s", exc) + + # Network instances (VRFs) + try: + data["network_instances"] = dev.get_network_instances() + log.debug(" VRFs: %d", len(data["network_instances"])) + except Exception as exc: + log.debug(" get_network_instances() unavailable: %s", exc) + + # BGP neighbors + try: + data["bgp_neighbors"] = dev.get_bgp_neighbors_detail() + total = sum(len(peers) for vrf in data["bgp_neighbors"].values() for peers in vrf.values()) + log.debug(" BGP peers: %d", total) + except Exception as exc: + log.debug(" get_bgp_neighbors_detail() unavailable: %s", exc) + + return data + + +# --------------------------------------------------------------------------- +# pyATS/Genie data collection (optional) +# --------------------------------------------------------------------------- + + +def collect_pyats_data(host: str, driver: str, username: str, password: str, + secret: str = "") -> dict: + """Collect CDP, OSPF, IS-IS data via pyATS/Genie parsers.""" + if not HAS_PYATS: + return {} + + os_map = {"ios": "iosxe", "iosxr": "iosxr", "nxos": "nxos", "nxos_ssh": "nxos"} + genie_os = os_map.get(driver) + if not genie_os: + log.debug(" pyATS: no mapping for driver %s, skipping", driver) + return {} + + data = {} + try: + dev = GenieDevice(name=host, os=genie_os, credentials={ + "default": {"username": username, "password": password} + }) + dev.connect(ip=host, init_exec_commands=[], init_config_commands=[], log_stdout=False) + except Exception as exc: + log.warning(" pyATS connect failed: %s", exc) + return data + + # CDP neighbors + try: + data["cdp_neighbors"] = dev.parse("show cdp neighbors detail") + log.debug(" CDP neighbors: %d", len(data["cdp_neighbors"].get("index", {}))) + except Exception as exc: + log.debug(" CDP parse failed: %s", exc) + + # OSPF neighbors + try: + data["ospf_neighbors"] = dev.parse("show ip ospf neighbor") + log.debug(" OSPF neighbors parsed") + except Exception as exc: + log.debug(" OSPF parse unavailable: %s", exc) + + # IS-IS adjacencies + try: + data["isis_adjacencies"] = dev.parse("show isis adjacency") + log.debug(" IS-IS adjacencies parsed") + except Exception as exc: + log.debug(" IS-IS parse unavailable: %s", exc) + + # Inventory (modules, transceivers) + try: + data["inventory"] = dev.parse("show inventory") + log.debug(" Inventory parsed") + except Exception as exc: + log.debug(" Inventory parse unavailable: %s", exc) + + try: + dev.disconnect() + except Exception: + pass + + return data + + +# --------------------------------------------------------------------------- +# Device reference helper (rich references for Diode reconciler) +# --------------------------------------------------------------------------- + + +def _device_ref(hostname: str, model: str, manufacturer: str, role: str, + site_name: str) -> Device: + """Build a rich Device reference with enough context for the reconciler.""" + return Device( + name=hostname, + device_type=DeviceType( + model=model or "Unknown", + manufacturer=Manufacturer(name=manufacturer or "Unknown"), + ), + role=DeviceRole(name=role), + site=Site(name=site_name), + ) + + +# --------------------------------------------------------------------------- +# Entity builders +# --------------------------------------------------------------------------- + + +def build_device_entity(facts: dict, driver: str, role: str, site_name: str, + host: str) -> Entity: + """Build a Device entity from NAPALM facts.""" + hostname = facts.get("hostname") or host + model = facts.get("model") or "Unknown" + vendor = facts.get("vendor") or DRIVER_TO_MANUFACTURER.get(driver, "Unknown") + serial = facts.get("serial_number") or "" + os_version = facts.get("os_version") or "" + platform_name = DRIVER_TO_PLATFORM.get(driver, driver) + + return Entity(device=Device( + name=hostname, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=vendor), + ), + role=DeviceRole(name=role), + platform=Platform(name=platform_name), + site=Site(name=site_name), + serial=serial, + status="active", + comments=f"OS: {os_version}" if os_version else "", + tags=["network-collector"], + )) + + +def build_interface_entities(interfaces: dict, hostname: str, model: str, + manufacturer: str, role: str, + site_name: str) -> list[Entity]: + """Build Interface entities from NAPALM get_interfaces().""" + entities = [] + dev_ref = _device_ref(hostname, model, manufacturer, role, site_name) + + for name, iface_data in interfaces.items(): + speed = iface_data.get("speed", 0) + iface_type = map_interface_type(name, speed) + + iface = Interface( + device=dev_ref, + name=name, + type=iface_type, + enabled=iface_data.get("is_enabled", True), + mac_address=iface_data.get("mac_address") or "", + mtu=iface_data.get("mtu") or 0, + speed=speed * 1000 if speed else 0, # NAPALM Mbps → NetBox Kbps + description=iface_data.get("description") or "", + tags=["network-collector"], + ) + entities.append(Entity(interface=iface)) + + return entities + + +def build_ip_entities(interfaces_ip: dict, hostname: str, model: str, + manufacturer: str, role: str, + site_name: str) -> list[Entity]: + """Build IPAddress entities from NAPALM get_interfaces_ip().""" + entities = [] + dev_ref = _device_ref(hostname, model, manufacturer, role, site_name) + + for iface_name, af_data in interfaces_ip.items(): + iface_type = map_interface_type(iface_name) + + for af in ("ipv4", "ipv6"): + addrs = af_data.get(af, {}) + for addr, meta in addrs.items(): + prefix_len = meta.get("prefix_length", 32 if af == "ipv4" else 128) + ip_str = f"{addr}/{prefix_len}" + + # Skip link-local IPv6 + if af == "ipv6" and addr.lower().startswith("fe80"): + continue + + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_interface=Interface( + device=dev_ref, + name=iface_name, + type=iface_type, + ), + tags=["network-collector"], + ))) + + return entities + + +def build_vlan_entities(vlans: dict, site_name: str) -> list[Entity]: + """Build VLAN entities from NAPALM get_vlans().""" + entities = [] + for vid_str, vlan_data in vlans.items(): + vid = int(vid_str) + if vid in (0, 4095): + continue + name = vlan_data.get("name") or f"VLAN{vid}" + entities.append(Entity(vlan=VLAN( + vid=vid, + name=name, + site=Site(name=site_name), + status="active", + tags=["network-collector"], + ))) + return entities + + +def build_vrf_entities(network_instances: dict) -> list[Entity]: + """Build VRF entities from NAPALM get_network_instances().""" + entities = [] + for vrf_name, vrf_data in network_instances.items(): + if vrf_name in ("default", "global"): + continue + rd = "" + if vrf_data.get("state", {}).get("route_distinguisher"): + rd = vrf_data["state"]["route_distinguisher"] + entities.append(Entity(vrf=VRF( + name=vrf_name, + rd=rd or None, + tags=["network-collector"], + ))) + return entities + + +def build_prefix_entities(interfaces_ip: dict, site_name: str) -> list[Entity]: + """Build Prefix entities from discovered interface IPs.""" + import ipaddress + seen = set() + entities = [] + + for iface_name, af_data in interfaces_ip.items(): + for af in ("ipv4", "ipv6"): + for addr, meta in af_data.get(af, {}).items(): + prefix_len = meta.get("prefix_length", 32 if af == "ipv4" else 128) + try: + network = ipaddress.ip_network(f"{addr}/{prefix_len}", strict=False) + except ValueError: + continue + # Skip host routes and link-local + if af == "ipv4" and prefix_len >= 31: + continue + if af == "ipv6" and (prefix_len >= 127 or addr.lower().startswith("fe80")): + continue + prefix_str = str(network) + if prefix_str not in seen: + seen.add(prefix_str) + entities.append(Entity(prefix=Prefix( + prefix=prefix_str, + scope_site=Site(name=site_name), + status="active", + tags=["network-collector"], + ))) + + return entities + + +def build_config_entity(config: dict, hostname: str, model: str, + manufacturer: str, role: str, + site_name: str) -> Entity | None: + """Build a DeviceConfig entity from NAPALM get_config().""" + running = config.get("running", "") + startup = config.get("startup", "") + if not running and not startup: + return None + dev_ref = _device_ref(hostname, model, manufacturer, role, site_name) + return Entity(device=dev_ref, device_config=DeviceConfig( + running=running.encode("utf-8") if running else None, + startup=startup.encode("utf-8") if startup else None, + )) + + +def build_inventory_entities(inventory: dict, hostname: str, model: str, + manufacturer: str, role: str, + site_name: str) -> list[Entity]: + """Build InventoryItem entities from pyATS show inventory.""" + entities = [] + dev_ref = _device_ref(hostname, model, manufacturer, role, site_name) + + # pyATS inventory format varies by platform; handle the common structure + items = inventory.get("main", {}).get("chassis", {}) + if not items: + items = inventory + for item_name, item_data in items.items(): + if isinstance(item_data, dict): + pid = item_data.get("pid") or item_data.get("name") or "" + sn = item_data.get("sn") or "" + desc = item_data.get("descr") or item_data.get("description") or "" + entities.append(Entity(inventory_item=InventoryItem( + device=dev_ref, + name=str(item_name)[:64], + part_id=str(pid)[:50] if pid else None, + serial=str(sn)[:50] if sn else None, + description=str(desc)[:200] if desc else None, + discovered=True, + tags=["network-collector"], + ))) + + return entities + + +# --------------------------------------------------------------------------- +# Cable discovery from LLDP/CDP +# --------------------------------------------------------------------------- + + +def build_cable_entities_from_lldp( + lldp_all: dict[str, dict], + site_name: str, + device_models: dict[str, tuple[str, str, str]], +) -> list[Entity]: + """Build Cable entities from LLDP neighbor data collected from all devices. + + lldp_all: {hostname: {local_iface: [{remote_system_name, remote_port, ...}]}} + device_models: {hostname: (model, manufacturer, role)} + + Deduplication: each link appears on both ends. We sort the pair and keep + only one Cable per unique (deviceA:portA, deviceB:portB) tuple. + """ + seen_links: set[tuple] = set() + entities = [] + + for local_host, iface_neighbors in lldp_all.items(): + local_info = device_models.get(local_host, ("Unknown", "Unknown", "Network Device")) + + for local_iface, neighbors in iface_neighbors.items(): + for neighbor in neighbors: + remote_host = ( + neighbor.get("remote_system_name") + or neighbor.get("remote_system_description", "") + ).split(".")[0] # Strip FQDN + remote_port = neighbor.get("remote_port") or neighbor.get("remote_port_description", "") + + if not remote_host or not remote_port: + continue + + # Normalize interface names for matching + local_norm = normalize_interface_name(local_iface) + remote_norm = normalize_interface_name(remote_port) + + # Deduplicate: sorted link key + link_a = f"{local_host}:{local_norm}" + link_b = f"{remote_host}:{remote_norm}" + link_key = tuple(sorted([link_a, link_b])) + if link_key in seen_links: + continue + seen_links.add(link_key) + + # Build device references for both ends + remote_info = device_models.get( + remote_host, ("Unknown", "Unknown", "Network Device") + ) + + a_dev = _device_ref(local_host, *local_info, site_name) + b_dev = _device_ref(remote_host, *remote_info, site_name) + + a_iface_type = map_interface_type(local_norm) + b_iface_type = map_interface_type(remote_norm) + + cable = Cable( + a_terminations=[GenericObject(object_interface=Interface( + device=a_dev, + name=local_norm, + type=a_iface_type, + ))], + b_terminations=[GenericObject(object_interface=Interface( + device=b_dev, + name=remote_norm, + type=b_iface_type, + ))], + status="connected", + tags=["lldp-discovered"], + ) + entities.append(Entity(cable=cable)) + log.info(" Cable: %s:%s <-> %s:%s", + local_host, local_norm, remote_host, remote_norm) + + return entities + + +def build_cable_entities_from_cdp( + cdp_all: dict[str, dict], + site_name: str, + device_models: dict[str, tuple[str, str, str]], + existing_links: set[tuple] | None = None, +) -> list[Entity]: + """Build Cable entities from CDP neighbor data (pyATS parsed). + + cdp_all: {hostname: pyats_parsed_cdp_output} + Only creates cables for links NOT already discovered via LLDP. + """ + if existing_links is None: + existing_links = set() + entities = [] + + for local_host, cdp_data in cdp_all.items(): + local_info = device_models.get(local_host, ("Unknown", "Unknown", "Network Device")) + + # pyATS CDP format: {"index": {1: {device_id, local_interface, port_id, ...}}} + for idx, entry in cdp_data.get("index", {}).items(): + remote_host = (entry.get("device_id") or "").split(".")[0] + local_iface = entry.get("local_interface") or "" + remote_port = entry.get("port_id") or "" + + if not remote_host or not local_iface or not remote_port: + continue + + local_norm = normalize_interface_name(local_iface) + remote_norm = normalize_interface_name(remote_port) + + link_a = f"{local_host}:{local_norm}" + link_b = f"{remote_host}:{remote_norm}" + link_key = tuple(sorted([link_a, link_b])) + if link_key in existing_links: + continue + existing_links.add(link_key) + + remote_info = device_models.get( + remote_host, ("Unknown", "Unknown", "Network Device") + ) + + a_dev = _device_ref(local_host, *local_info, site_name) + b_dev = _device_ref(remote_host, *remote_info, site_name) + + cable = Cable( + a_terminations=[GenericObject(object_interface=Interface( + device=a_dev, + name=local_norm, + type=map_interface_type(local_norm), + ))], + b_terminations=[GenericObject(object_interface=Interface( + device=b_dev, + name=remote_norm, + type=map_interface_type(remote_norm), + ))], + status="connected", + tags=["cdp-discovered"], + ) + entities.append(Entity(cable=cable)) + log.info(" Cable (CDP): %s:%s <-> %s:%s", + local_host, local_norm, remote_host, remote_norm) + + return entities + + +# --------------------------------------------------------------------------- +# NetBox plugin API pushers (BGP, OSPF, IS-IS) +# --------------------------------------------------------------------------- + + +def push_bgp_to_netbox(bgp_data: dict, hostname: str, netbox_url: str, + netbox_token: str, site_name: str) -> None: + """Push BGP neighbor data to the netbox-bgp plugin API.""" + if not HAS_REQUESTS or not bgp_data: + return + + headers = {"Authorization": f"Bearer {netbox_token}", "Content-Type": "application/json"} + base = netbox_url.rstrip("/") + + # First look up the local device in NetBox to get its ID + resp = requests.get(f"{base}/api/dcim/devices/", params={"name": hostname}, + headers=headers, timeout=10) + if resp.status_code != 200: + log.warning(" BGP push: cannot find device %s in NetBox", hostname) + return + devices = resp.json().get("results", []) + if not devices: + log.warning(" BGP push: device %s not found in NetBox", hostname) + return + local_device_id = devices[0]["id"] + + for vrf_name, peers in bgp_data.items(): + for peer_ip, peer_list in peers.items(): + for peer in peer_list: + local_as = peer.get("local_as") + remote_as = peer.get("remote_as") + remote_id = peer.get("remote_id", "") + + if not local_as or not remote_as: + continue + + session_data = { + "name": f"{hostname} <-> {peer_ip}", + "device": local_device_id, + "local_address": None, # Would need to resolve + "remote_address": None, + "local_as": {"asn": local_as}, + "remote_as": {"asn": remote_as}, + "status": "active" if peer.get("is_enabled") else "offline", + "description": f"Auto-discovered by network-collector", + } + + try: + resp = requests.post( + f"{base}/api/plugins/bgp/sessions/", + headers=headers, + json=session_data, + timeout=10, + ) + if resp.status_code in (200, 201): + log.info(" BGP session created: %s AS%s <-> %s AS%s", + hostname, local_as, peer_ip, remote_as) + elif resp.status_code == 400 and "already exists" in resp.text.lower(): + log.debug(" BGP session already exists: %s <-> %s", hostname, peer_ip) + else: + log.warning(" BGP push failed (%d): %s", resp.status_code, resp.text[:200]) + except Exception as exc: + log.warning(" BGP push error: %s", exc) + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def collect_all_entities(inventory: dict, env_file: str = ".env") -> tuple[list[Entity], dict]: + """Walk all devices in inventory, collect data, build entities.""" + defaults = inventory.get("defaults", {}) + site_name = defaults.get("site", "main") + entities: list[Entity] = [] + lldp_all: dict[str, dict] = {} + cdp_all: dict[str, dict] = {} + device_models: dict[str, tuple[str, str, str]] = {} + bgp_all: dict[str, dict] = {} + + for dev_entry in inventory["devices"]: + cfg = merge_device_config(dev_entry, defaults) + host = cfg["host"] + driver = cfg.get("driver", "ios") + role = cfg.get("role", "Network Device") + username = cfg.get("username", "admin") + password = cfg.get("password", "") + secret = cfg.get("secret", "") + timeout = int(cfg.get("timeout", 60)) + optional_args = cfg.get("optional_args", {}) + + log.info("Connecting to %s (driver=%s, role=%s)...", host, driver, role) + + # --- NAPALM collection --- + try: + dev = connect_device(host, driver, username, password, secret, + timeout, optional_args) + except Exception as exc: + log.error("Failed to connect to %s: %s", host, exc) + continue + + try: + napalm_data = collect_napalm_data(dev) + finally: + try: + dev.close() + except Exception: + pass + + if not napalm_data.get("facts"): + log.error("No facts for %s, skipping", host) + continue + + facts = napalm_data["facts"] + hostname = facts.get("hostname") or host + model = facts.get("model") or "Unknown" + vendor = facts.get("vendor") or DRIVER_TO_MANUFACTURER.get(driver, "Unknown") + + # Track device info for cable building + device_models[hostname] = (model, vendor, role) + + # Device entity + entities.append(build_device_entity(facts, driver, role, site_name, host)) + + # Interface entities + if napalm_data.get("interfaces"): + entities.extend(build_interface_entities( + napalm_data["interfaces"], hostname, model, vendor, role, site_name + )) + + # IP entities (IPv4 + IPv6) + if napalm_data.get("interfaces_ip"): + entities.extend(build_ip_entities( + napalm_data["interfaces_ip"], hostname, model, vendor, role, site_name + )) + # Prefix entities from discovered IPs + entities.extend(build_prefix_entities( + napalm_data["interfaces_ip"], site_name + )) + + # VLAN entities + if napalm_data.get("vlans"): + entities.extend(build_vlan_entities(napalm_data["vlans"], site_name)) + + # VRF entities + if napalm_data.get("network_instances"): + entities.extend(build_vrf_entities(napalm_data["network_instances"])) + + # Config entity + if napalm_data.get("config"): + config_entity = build_config_entity( + napalm_data["config"], hostname, model, vendor, role, site_name + ) + if config_entity: + entities.append(config_entity) + + # LLDP neighbors (saved for cable building later) + if napalm_data.get("lldp_neighbors"): + lldp_all[hostname] = napalm_data["lldp_neighbors"] + + # BGP data (saved for plugin API push later) + if napalm_data.get("bgp_neighbors"): + bgp_all[hostname] = napalm_data["bgp_neighbors"] + + # --- pyATS collection (optional) --- + if HAS_PYATS and driver in ("ios", "iosxr", "nxos", "nxos_ssh"): + log.info(" Running pyATS parsers...") + pyats_data = collect_pyats_data(host, driver, username, password, secret) + + if pyats_data.get("cdp_neighbors"): + cdp_all[hostname] = pyats_data["cdp_neighbors"] + + if pyats_data.get("inventory"): + entities.extend(build_inventory_entities( + pyats_data["inventory"], hostname, model, vendor, role, site_name + )) + + # --- Cable entities from LLDP --- + if lldp_all: + log.info("Building cable entities from LLDP data...") + cable_entities = build_cable_entities_from_lldp(lldp_all, site_name, device_models) + entities.extend(cable_entities) + + # Extract seen links for CDP dedup + seen_links = set() + for local_host, iface_neighbors in lldp_all.items(): + for local_iface, neighbors in iface_neighbors.items(): + for neighbor in neighbors: + remote_host = (neighbor.get("remote_system_name") or "").split(".")[0] + remote_port = neighbor.get("remote_port") or "" + if remote_host and remote_port: + local_norm = normalize_interface_name(local_iface) + remote_norm = normalize_interface_name(remote_port) + link_key = tuple(sorted([ + f"{local_host}:{local_norm}", + f"{remote_host}:{remote_norm}", + ])) + seen_links.add(link_key) + else: + seen_links = set() + + # --- Cable entities from CDP (only new links) --- + if cdp_all: + log.info("Building cable entities from CDP data...") + cdp_cable_entities = build_cable_entities_from_cdp( + cdp_all, site_name, device_models, seen_links + ) + entities.extend(cdp_cable_entities) + + return entities, bgp_all + + +def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: + """Send entities to Diode (or dry-run print them).""" + if not entities: + log.warning("No entities to ingest") + return + + target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") + client_id = os.environ.get("DIODE_CLIENT_ID", + os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) + client_secret = os.environ.get("DIODE_CLIENT_SECRET", + os.environ.get("INGESTER_CLIENT_SECRET", "")) + + if dry_run: + log.info("DRY RUN: %d entities would be ingested", len(entities)) + for i, e in enumerate(entities): + log.info(" [%d] %s", i, e) + return + + if not client_secret: + log.error("DIODE_CLIENT_SECRET not set — cannot ingest") + sys.exit(1) + + log.info("Ingesting %d entities to %s ...", len(entities), target) + + from netboxlabs.diode.sdk.ingester import create_message_chunks + + with DiodeClient( + target=target, + client_id=client_id, + client_secret=client_secret, + app_name="network-collector", + app_version="0.1.0", + ) as client: + chunks = create_message_chunks(entities) + for idx, chunk in enumerate(chunks): + resp = client.ingest(entities=chunk) + if resp.errors: + log.error("Chunk %d errors: %s", idx, resp.errors) + else: + log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(): + parser = argparse.ArgumentParser(description="Network device collector for NetBox") + parser.add_argument("--inventory", "-i", required=True, + help="Path to device inventory YAML file") + parser.add_argument("--dry-run", action="store_true", + help="Collect data but don't ingest") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"]) + parser.add_argument("--env-file", default=".env", + help="Path to .env file (default: .env)") + parser.add_argument("--no-bgp-push", action="store_true", + help="Skip pushing BGP data to netbox-bgp plugin") + parser.add_argument("--no-pyats", action="store_true", + help="Skip pyATS/Genie collection even if available") + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + + load_dotenv(args.env_file) + + if args.no_pyats: + global HAS_PYATS + HAS_PYATS = False + + # Load inventory + inventory = load_inventory(args.inventory) + log.info("Loaded %d devices from inventory", len(inventory["devices"])) + + # Collect and build entities + entities, bgp_all = collect_all_entities(inventory, args.env_file) + log.info("Total entities: %d", len(entities)) + + # Ingest via Diode + ingest_entities(entities, dry_run=args.dry_run) + + # Push BGP sessions to netbox-bgp plugin API + if not args.no_bgp_push and not args.dry_run and bgp_all: + netbox_url = os.environ.get("NETBOX_URL", "http://172.19.77.160:8000") + netbox_token = os.environ.get("NETBOX_API_TOKEN", "") + if netbox_token: + for hostname, bgp_data in bgp_all.items(): + log.info("Pushing BGP data for %s to netbox-bgp...", hostname) + push_bgp_to_netbox(bgp_data, hostname, netbox_url, netbox_token, + inventory.get("defaults", {}).get("site", "main")) + else: + log.warning("NETBOX_API_TOKEN not set, skipping BGP plugin push") + + log.info("Done!") + + +if __name__ == "__main__": + main() diff --git a/collectors/observium_collector.py b/collectors/observium_collector.py new file mode 100644 index 0000000..47d7930 --- /dev/null +++ b/collectors/observium_collector.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python3 +"""Observium collector for NetBox via Diode SDK. + +Pulls device, port, and IP data from the Observium REST API for brownfield +import into NetBox. Adds Observium device IDs as custom fields for cross-referencing. + +Note: Observium API requires Professional or Enterprise edition. +Community Edition users can skip this collector. + +Usage: + python collectors/observium_collector.py --dry-run + python collectors/observium_collector.py +""" + +import argparse +import logging +import os +import sys + +import requests +from requests.auth import HTTPBasicAuth + +from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient +from netboxlabs.diode.sdk.ingester import ( + CustomFieldValue, + Device, + DeviceRole, + DeviceType, + Entity, + Interface, + IPAddress, + Manufacturer, + Platform, + Site, + VLAN, +) + +log = logging.getLogger("observium-collector") + +# --------------------------------------------------------------------------- +# Observium → NetBox mappings +# --------------------------------------------------------------------------- + +OBSERVIUM_STATUS_MAP = { + "1": "active", # up + "0": "offline", # down +} + +IF_TYPE_MAP = { + "ethernetCsmacd": "1000base-t", + "gigabitEthernet": "1000base-t", + "fastEthernet": "100base-tx", + "propVirtual": "virtual", + "l3ipvlan": "virtual", + "tunnel": "virtual", + "softwareLoopback": "virtual", + "ieee8023adLag": "lag", + "bridge": "bridge", + "other": "other", +} + +OS_TO_PLATFORM = { + "ios": "Cisco IOS", + "iosxe": "Cisco IOS-XE", + "iosxr": "Cisco IOS-XR", + "nxos": "Cisco NX-OS", + "junos": "Juniper Junos", + "linux": "Linux", + "vmware": "VMware ESXi", + "ironware": "Brocade IronWare", + "fastiron": "Brocade FastIron", + "windows": "Windows", + "freebsd": "FreeBSD", + "proxmox": "Proxmox VE", +} + +OS_TO_MANUFACTURER = { + "ios": "Cisco", + "iosxe": "Cisco", + "iosxr": "Cisco", + "nxos": "Cisco", + "junos": "Juniper", + "ironware": "Brocade", + "fastiron": "Brocade", +} + +OS_TO_ROLE = { + "ios": "Router", + "iosxe": "Router", + "iosxr": "Router", + "nxos": "Switch", + "junos": "Router", + "ironware": "Switch", + "fastiron": "Switch", + "linux": "Server", + "vmware": "Hypervisor", + "windows": "Server", + "proxmox": "Hypervisor", +} + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + + +def load_dotenv(path: str = ".env") -> None: + if not os.path.isfile(path): + return + with open(path) as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + os.environ.setdefault(key.strip(), val.strip().strip("\"'")) + + +def get_config() -> dict: + return { + "url": os.environ.get("OBSERVIUM_URL", ""), + "user": os.environ.get("OBSERVIUM_USER", "admin"), + "password": os.environ.get("OBSERVIUM_PASSWORD", ""), + "site": os.environ.get("OBSERVIUM_SITE", "main"), + "default_role": os.environ.get("OBSERVIUM_DEFAULT_ROLE", "Network Device"), + } + + +# --------------------------------------------------------------------------- +# API helpers +# --------------------------------------------------------------------------- + + +def api_get(base_url: str, endpoint: str, auth: HTTPBasicAuth, + params: dict | None = None) -> dict: + """Make a GET request to the Observium API.""" + url = f"{base_url}/{endpoint}" + resp = requests.get(url, auth=auth, params=params, timeout=30, verify=False) + resp.raise_for_status() + return resp.json() + + +# --------------------------------------------------------------------------- +# Device reference helper +# --------------------------------------------------------------------------- + + +def _device_ref(name: str, model: str, manufacturer: str, role: str, + site_name: str) -> Device: + return Device( + name=name, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=manufacturer), + ), + role=DeviceRole(name=role), + site=Site(name=site_name), + ) + + +# --------------------------------------------------------------------------- +# Data collection and entity building +# --------------------------------------------------------------------------- + + +def collect_all_entities(cfg: dict) -> list[Entity]: + base_url = cfg["url"].rstrip("/") + if not base_url: + log.error("OBSERVIUM_URL not set") + sys.exit(1) + + auth = HTTPBasicAuth(cfg["user"], cfg["password"]) + site_name = cfg["site"] + default_role = cfg["default_role"] + entities: list[Entity] = [] + + # --- Devices --- + log.info("Fetching devices from Observium...") + try: + devices_resp = api_get(base_url, "devices", auth) + except Exception as exc: + log.error("Failed to fetch devices: %s", exc) + sys.exit(1) + + devices = devices_resp.get("devices", {}) + if isinstance(devices, list): + devices = {str(i): d for i, d in enumerate(devices)} + log.info("Found %d devices", len(devices)) + + device_id_to_name = {} + + for dev_id, dev_data in devices.items(): + hostname = dev_data.get("hostname") or dev_data.get("sysName") or f"device-{dev_id}" + device_id_to_name[dev_id] = hostname + + os_type = dev_data.get("os") or "" + model = dev_data.get("hardware") or "Unknown" + vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown") + serial = dev_data.get("serial") or "" + status = "active" if dev_data.get("status") == "1" else "offline" + role = OS_TO_ROLE.get(os_type, default_role) + platform = OS_TO_PLATFORM.get(os_type) + + custom_fields = { + "observium_device_id": CustomFieldValue(text=str(dev_id)), + } + + device_kwargs = dict( + name=hostname, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=vendor), + ), + role=DeviceRole(name=role), + site=Site(name=site_name), + serial=serial[:50] if serial else "", + status=status, + tags=["observium"], + custom_fields=custom_fields, + ) + if platform: + device_kwargs["platform"] = Platform(name=platform) + + entities.append(Entity(device=Device(**device_kwargs))) + + # --- Ports (interfaces) --- + log.info("Fetching ports from Observium...") + try: + ports_resp = api_get(base_url, "ports", auth) + ports = ports_resp.get("ports", {}) + if isinstance(ports, list): + ports = {str(i): p for i, p in enumerate(ports)} + log.info("Found %d ports", len(ports)) + except Exception as exc: + log.warning("Failed to fetch ports: %s", exc) + ports = {} + + port_id_to_info = {} + + for port_id, port_data in ports.items(): + dev_id = str(port_data.get("device_id", "")) + hostname = device_id_to_name.get(dev_id) + if not hostname: + continue + + iface_name = port_data.get("ifName") or port_data.get("port_label") or f"port{port_id}" + if_type = port_data.get("ifType") or "other" + mac = port_data.get("ifPhysAddress") or "" + speed = int(port_data.get("ifSpeed", 0) or 0) // 1000000 # bps → Mbps + mtu = int(port_data.get("ifMtu", 0) or 0) + description = port_data.get("ifAlias") or "" + enabled = port_data.get("ifAdminStatus") == "up" + + iface_type = IF_TYPE_MAP.get(if_type, "other") + if iface_type == "other" and speed: + from collectors.network_collector import SPEED_TO_TYPE + iface_type = SPEED_TO_TYPE.get(speed, "other") + + # Look up device model for reference + dev_data = devices.get(dev_id, {}) + os_type = dev_data.get("os") or "" + model = dev_data.get("hardware") or "Unknown" + vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown") + role = OS_TO_ROLE.get(os_type, default_role) + + dev_ref = _device_ref(hostname, model, vendor, role, site_name) + + entities.append(Entity(interface=Interface( + device=dev_ref, + name=iface_name[:64], + type=iface_type, + mac_address=mac, + mtu=mtu, + speed=speed * 1000 if speed else 0, # Mbps → Kbps + enabled=enabled, + description=description[:200] if description else "", + tags=["observium"], + ))) + + port_id_to_info[port_id] = (hostname, iface_name, iface_type, model, vendor, role) + + # --- IP addresses --- + log.info("Fetching IP addresses from Observium...") + try: + # Observium may expose IPs through the ports/addresses endpoint + for port_id, (hostname, iface_name, iface_type, model, vendor, role) in port_id_to_info.items(): + try: + addr_resp = api_get(base_url, f"ports/{port_id}/ip", auth) + addresses = addr_resp.get("addresses", {}) + if isinstance(addresses, list): + addresses = {str(i): a for i, a in enumerate(addresses)} + + for addr_id, addr_data in addresses.items(): + ip = addr_data.get("ipv4_address") or addr_data.get("ipv6_address") or "" + prefix_len = addr_data.get("ipv4_prefixlen") or addr_data.get("ipv6_prefixlen") or "" + if ip and prefix_len: + ip_str = f"{ip}/{prefix_len}" + dev_ref = _device_ref(hostname, model, vendor, role, site_name) + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_interface=Interface( + device=dev_ref, + name=iface_name, + type=iface_type, + ), + tags=["observium"], + ))) + except Exception: + pass # IP endpoint may not be available for all ports + except Exception as exc: + log.warning("IP address collection failed: %s", exc) + + return entities + + +def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: + if not entities: + log.warning("No entities to ingest") + return + + target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") + client_id = os.environ.get("DIODE_CLIENT_ID", + os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) + client_secret = os.environ.get("DIODE_CLIENT_SECRET", + os.environ.get("INGESTER_CLIENT_SECRET", "")) + + if dry_run: + log.info("DRY RUN: %d entities would be ingested", len(entities)) + for i, e in enumerate(entities): + log.info(" [%d] %s", i, e) + return + + if not client_secret: + log.error("DIODE_CLIENT_SECRET not set — cannot ingest") + sys.exit(1) + + log.info("Ingesting %d entities to %s ...", len(entities), target) + + from netboxlabs.diode.sdk.ingester import create_message_chunks + + with DiodeClient( + target=target, + client_id=client_id, + client_secret=client_secret, + app_name="observium-collector", + app_version="0.1.0", + ) as client: + chunks = create_message_chunks(entities) + for idx, chunk in enumerate(chunks): + resp = client.ingest(entities=chunk) + if resp.errors: + log.error("Chunk %d errors: %s", idx, resp.errors) + else: + log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + + +def main(): + parser = argparse.ArgumentParser(description="Observium collector for NetBox") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"]) + parser.add_argument("--env-file", default=".env") + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + + load_dotenv(args.env_file) + cfg = get_config() + + entities = collect_all_entities(cfg) + log.info("Total entities: %d", len(entities)) + + ingest_entities(entities, dry_run=args.dry_run) + log.info("Done!") + + +if __name__ == "__main__": + main() diff --git a/collectors/vmware_collector.py b/collectors/vmware_collector.py new file mode 100644 index 0000000..42d2c0f --- /dev/null +++ b/collectors/vmware_collector.py @@ -0,0 +1,548 @@ +#!/usr/bin/env python3 +"""VMware vSphere collector for NetBox via Diode SDK. + +Discovers ESXi hosts, VMs, interfaces, IPs, and disks from a vCenter +or standalone ESXi host and ingests them into NetBox via the Diode pipeline. + +Usage: + python collectors/vmware_collector.py --dry-run + python collectors/vmware_collector.py +""" + +import argparse +import atexit +import logging +import os +import re +import ssl +import sys + +from pyVim.connect import SmartConnect, Disconnect +from pyVmomi import vim, vmodl + +from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient +from netboxlabs.diode.sdk.ingester import ( + Cluster, + ClusterGroup, + ClusterType, + Device, + DeviceRole, + DeviceType, + Entity, + Interface, + IPAddress, + Manufacturer, + Platform, + Site, + VirtualDisk, + VirtualMachine, + VMInterface, +) + +log = logging.getLogger("vmware-collector") + +# --------------------------------------------------------------------------- +# Status mappings +# --------------------------------------------------------------------------- + +VM_POWER_STATE_MAP = { + "poweredOn": "active", + "poweredOff": "offline", + "suspended": "offline", +} + +HOST_STATUS_MAP = { + "green": "active", + "yellow": "active", + "red": "failed", + "gray": "planned", +} + +SPEED_TO_TYPE = { + 100: "100base-tx", + 1000: "1000base-t", + 2500: "2.5gbase-t", + 10000: "10gbase-x-sfpp", + 25000: "25gbase-x-sfp28", + 40000: "40gbase-x-qsfpp", + 100000: "100gbase-x-qsfp28", +} + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + + +def load_dotenv(path: str = ".env") -> None: + if not os.path.isfile(path): + return + with open(path) as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + os.environ.setdefault(key.strip(), val.strip().strip("\"'")) + + +def get_config() -> dict: + return { + "host": os.environ.get("VCENTER_HOST", ""), + "user": os.environ.get("VCENTER_USER", "administrator@vsphere.local"), + "password": os.environ.get("VCENTER_PASSWORD", ""), + "port": int(os.environ.get("VCENTER_PORT", "443")), + "verify_ssl": os.environ.get("VCENTER_VERIFY_SSL", "false").lower() == "true", + "site": os.environ.get("VCENTER_SITE", "main"), + } + + +# --------------------------------------------------------------------------- +# Reference helpers +# --------------------------------------------------------------------------- + + +def _device_ref(name: str, model: str, manufacturer: str, role: str, + site_name: str) -> Device: + return Device( + name=name, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=manufacturer), + ), + role=DeviceRole(name=role), + site=Site(name=site_name), + ) + + +def _vm_ref(name: str, cluster_name: str, site_name: str, + role: str = "Virtual Machine") -> VirtualMachine: + return VirtualMachine( + name=name, + site=Site(name=site_name), + cluster=Cluster( + name=cluster_name, + type=ClusterType(name="VMware ESXi"), + scope_site=Site(name=site_name), + ), + role=DeviceRole(name=role), + ) + + +# --------------------------------------------------------------------------- +# vSphere connection +# --------------------------------------------------------------------------- + + +def connect_vsphere(cfg: dict): + """Connect to vCenter/ESXi and return ServiceInstance.""" + host = cfg["host"] + if not host: + log.error("VCENTER_HOST not set") + sys.exit(1) + + context = None + if not cfg["verify_ssl"]: + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + si = SmartConnect( + host=host, + user=cfg["user"], + pwd=cfg["password"], + port=cfg["port"], + sslContext=context, + ) + atexit.register(Disconnect, si) + log.info("Connected to vSphere: %s", host) + return si + + +def get_all_objects(si, obj_type, folder=None): + """Get all managed objects of a given type.""" + content = si.RetrieveContent() + container = content.viewManager.CreateContainerView( + folder or content.rootFolder, [obj_type], True + ) + objects = list(container.view) + container.Destroy() + return objects + + +# --------------------------------------------------------------------------- +# Entity builders +# --------------------------------------------------------------------------- + + +def build_cluster_entities(si, site_name: str) -> list[Entity]: + """Build Cluster entities from vSphere clusters.""" + entities = [] + clusters = get_all_objects(si, vim.ClusterComputeResource) + + for cluster in clusters: + dc_name = "" + parent = cluster.parent + while parent: + if isinstance(parent, vim.Datacenter): + dc_name = parent.name + break + parent = getattr(parent, "parent", None) + + entities.append(Entity(cluster=Cluster( + name=cluster.name, + type=ClusterType(name="VMware ESXi"), + scope_site=Site(name=site_name), + group=ClusterGroup(name=dc_name) if dc_name else None, + status="active", + tags=["vmware"], + ))) + log.info(" Cluster: %s (DC: %s)", cluster.name, dc_name or "none") + + return entities + + +def build_host_entities(si, site_name: str) -> tuple[list[Entity], dict]: + """Build Device entities from ESXi hosts. Returns entities and host-to-cluster mapping.""" + entities = [] + host_cluster_map = {} + hosts = get_all_objects(si, vim.HostSystem) + + for host in hosts: + hostname = host.name + hw = host.hardware + sys_info = hw.systemInfo if hw else None + + model = sys_info.model if sys_info else "Unknown" + vendor = sys_info.vendor if sys_info else "Unknown" + serial = "" + if sys_info: + for ident in (sys_info.otherIdentifyingInfo or []): + if hasattr(ident, "identifierType") and \ + ident.identifierType and \ + ident.identifierType.key == "ServiceTag": + serial = ident.identifierValue + break + if not serial: + serial = getattr(sys_info, "serialNumber", "") or "" + + status = HOST_STATUS_MAP.get( + str(host.overallStatus) if host.overallStatus else "gray", + "active" + ) + + # Determine cluster + cluster_name = "" + if isinstance(host.parent, vim.ClusterComputeResource): + cluster_name = host.parent.name + host_cluster_map[hostname] = cluster_name + + entities.append(Entity(device=Device( + name=hostname, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=vendor), + ), + role=DeviceRole(name="Hypervisor"), + platform=Platform(name="VMware ESXi"), + site=Site(name=site_name), + serial=serial[:50] if serial else "", + status=status, + tags=["vmware"], + ))) + + # Physical NICs + dev_ref = _device_ref(hostname, model, vendor, "Hypervisor", site_name) + if host.config and host.config.network: + for pnic in (host.config.network.pnic or []): + speed_mbps = 0 + if pnic.linkSpeed: + speed_mbps = pnic.linkSpeed.speedMb + iface_type = SPEED_TO_TYPE.get(speed_mbps, "1000base-t") + + entities.append(Entity(interface=Interface( + device=dev_ref, + name=pnic.device, + type=iface_type, + mac_address=pnic.mac or "", + speed=speed_mbps * 1000 if speed_mbps else 0, + enabled=True, + tags=["vmware"], + ))) + + # VMkernel interfaces + for vnic in (host.config.network.vnic or []): + ip_str = "" + if vnic.spec and vnic.spec.ip: + ip = vnic.spec.ip.ipAddress + mask = vnic.spec.ip.subnetMask + if ip: + prefix_len = _mask_to_prefix(mask) if mask else 24 + ip_str = f"{ip}/{prefix_len}" + + entities.append(Entity(interface=Interface( + device=dev_ref, + name=vnic.device, + type="virtual", + mac_address=vnic.spec.mac if vnic.spec else "", + enabled=True, + tags=["vmware", "vmkernel"], + ))) + + if ip_str: + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_interface=Interface( + device=dev_ref, + name=vnic.device, + type="virtual", + ), + tags=["vmware"], + ))) + + log.info(" Host: %s (%s %s, cluster=%s)", + hostname, vendor, model, cluster_name or "standalone") + + return entities, host_cluster_map + + +def build_vm_entities(si, site_name: str, + host_cluster_map: dict) -> list[Entity]: + """Build VirtualMachine + VMInterface + VirtualDisk + IPAddress entities.""" + entities = [] + vms = get_all_objects(si, vim.VirtualMachine) + + for vm_obj in vms: + try: + vm_name = vm_obj.name + config = vm_obj.config + if not config: + log.debug(" Skipping VM with no config: %s", vm_name) + continue + + # Determine cluster from host + host_name = "" + cluster_name = "" + if vm_obj.runtime and vm_obj.runtime.host: + host_name = vm_obj.runtime.host.name + cluster_name = host_cluster_map.get(host_name, "") + if not cluster_name: + cluster_name = host_name or "standalone" + + power_state = str(vm_obj.runtime.powerState) if vm_obj.runtime else "poweredOff" + status = VM_POWER_STATE_MAP.get(power_state, "offline") + + # Resources + vcpus = config.hardware.numCPU if config.hardware else 0 + memory_mb = config.hardware.memoryMB if config.hardware else 0 + total_disk_gb = 0 + + # Determine platform from guest + guest_os = config.guestFullName or config.guestId or "" + platform_name = None + if "linux" in guest_os.lower() or "ubuntu" in guest_os.lower() or \ + "centos" in guest_os.lower() or "debian" in guest_os.lower(): + platform_name = "Linux" + elif "windows" in guest_os.lower(): + platform_name = "Windows" + + # Collect IPs for primary_ip4 + primary_ip4 = None + vm_ips = [] + + # Guest NIC info (requires VMware Tools) + if vm_obj.guest and vm_obj.guest.net: + for guest_nic in vm_obj.guest.net: + if guest_nic.ipConfig: + for ip_entry in guest_nic.ipConfig.ipAddress: + addr = ip_entry.ipAddress + prefix = ip_entry.prefixLength + if addr and not addr.startswith("fe80") and \ + not addr.startswith("127."): + ip_str = f"{addr}/{prefix}" + nic_name = guest_nic.network or "eth0" + vm_ips.append((ip_str, nic_name)) + if not primary_ip4 and ":" not in addr: + primary_ip4 = ip_str + + # VirtualMachine entity + vm_kwargs = dict( + name=vm_name, + status=status, + site=Site(name=site_name), + cluster=Cluster( + name=cluster_name, + type=ClusterType(name="VMware ESXi"), + scope_site=Site(name=site_name), + ), + role=DeviceRole(name="Virtual Machine"), + vcpus=vcpus, + memory=memory_mb, + comments=f"Guest: {guest_os}" if guest_os else "", + tags=["vmware"], + ) + if platform_name: + vm_kwargs["platform"] = Platform(name=platform_name) + if primary_ip4: + vm_kwargs["primary_ip4"] = IPAddress(address=primary_ip4) + + entities.append(Entity(virtual_machine=VirtualMachine(**vm_kwargs))) + + # VM NICs + vm_ref = _vm_ref(vm_name, cluster_name, site_name) + if config.hardware and config.hardware.device: + for device in config.hardware.device: + if isinstance(device, vim.vm.device.VirtualEthernetCard): + nic_name = device.deviceInfo.label if device.deviceInfo else f"nic{device.key}" + mac = getattr(device, "macAddress", "") or "" + net_name = "" + if hasattr(device, "backing"): + backing = device.backing + if hasattr(backing, "network") and backing.network: + net_name = backing.network.name + elif hasattr(backing, "deviceName"): + net_name = backing.deviceName + + entities.append(Entity(vm_interface=VMInterface( + virtual_machine=vm_ref, + name=nic_name[:64], + enabled=device.connectable.connected if device.connectable else True, + mac_address=mac, + description=net_name[:200] if net_name else "", + tags=["vmware"], + ))) + + # Virtual Disks + elif isinstance(device, vim.vm.device.VirtualDisk): + disk_name = device.deviceInfo.label if device.deviceInfo else f"disk{device.key}" + disk_size_gb = device.capacityInKB // (1024 * 1024) if device.capacityInKB else 0 + total_disk_gb += disk_size_gb + + if disk_size_gb > 0: + entities.append(Entity(virtual_disk=VirtualDisk( + virtual_machine=vm_ref, + name=disk_name[:64], + size=disk_size_gb, + tags=["vmware"], + ))) + + # IP entities from guest tools + for ip_str, nic_name in vm_ips: + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_vm_interface=VMInterface( + virtual_machine=vm_ref, + name=nic_name[:64], + ), + tags=["vmware"], + ))) + + log.info(" VM: %s (%s, %d vCPU, %d MB RAM, %d GB disk)", + vm_name, status, vcpus, memory_mb, total_disk_gb) + + except Exception as exc: + log.error(" Failed to process VM %s: %s", + getattr(vm_obj, "name", "?"), exc) + + return entities + + +def _mask_to_prefix(mask: str) -> int: + """Convert subnet mask to prefix length.""" + try: + return sum(bin(int(x)).count("1") for x in mask.split(".")) + except (ValueError, AttributeError): + return 24 + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def collect_all_entities(cfg: dict) -> list[Entity]: + si = connect_vsphere(cfg) + site_name = cfg["site"] + entities: list[Entity] = [] + + # Clusters + entities.extend(build_cluster_entities(si, site_name)) + + # ESXi hosts + interfaces + host_entities, host_cluster_map = build_host_entities(si, site_name) + entities.extend(host_entities) + + # VMs + entities.extend(build_vm_entities(si, site_name, host_cluster_map)) + + return entities + + +def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: + if not entities: + log.warning("No entities to ingest") + return + + target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") + client_id = os.environ.get("DIODE_CLIENT_ID", + os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) + client_secret = os.environ.get("DIODE_CLIENT_SECRET", + os.environ.get("INGESTER_CLIENT_SECRET", "")) + + if dry_run: + log.info("DRY RUN: %d entities would be ingested", len(entities)) + for i, e in enumerate(entities): + log.info(" [%d] %s", i, e) + return + + if not client_secret: + log.error("DIODE_CLIENT_SECRET not set — cannot ingest") + sys.exit(1) + + log.info("Ingesting %d entities to %s ...", len(entities), target) + + from netboxlabs.diode.sdk.ingester import create_message_chunks + + with DiodeClient( + target=target, + client_id=client_id, + client_secret=client_secret, + app_name="vmware-collector", + app_version="0.1.0", + ) as client: + chunks = create_message_chunks(entities) + for idx, chunk in enumerate(chunks): + resp = client.ingest(entities=chunk) + if resp.errors: + log.error("Chunk %d errors: %s", idx, resp.errors) + else: + log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + + +def main(): + parser = argparse.ArgumentParser(description="VMware vSphere collector for NetBox") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"]) + parser.add_argument("--env-file", default=".env") + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + + load_dotenv(args.env_file) + cfg = get_config() + + entities = collect_all_entities(cfg) + log.info("Total entities: %d", len(entities)) + + ingest_entities(entities, dry_run=args.dry_run) + log.info("Done!") + + +if __name__ == "__main__": + main() diff --git a/collectors/zabbix_collector.py b/collectors/zabbix_collector.py new file mode 100644 index 0000000..cd82734 --- /dev/null +++ b/collectors/zabbix_collector.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +"""Zabbix collector for NetBox via Diode SDK. + +Pulls device inventory from Zabbix API for brownfield import into NetBox. +Also adds Zabbix host IDs as custom fields for cross-referencing. + +Usage: + python collectors/zabbix_collector.py --dry-run + python collectors/zabbix_collector.py +""" + +import argparse +import logging +import os +import sys + +from pyzabbix import ZabbixAPI + +from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient +from netboxlabs.diode.sdk.ingester import ( + CustomFieldValue, + Device, + DeviceRole, + DeviceType, + Entity, + Interface, + IPAddress, + Manufacturer, + Platform, + Site, +) + +log = logging.getLogger("zabbix-collector") + +# --------------------------------------------------------------------------- +# Zabbix → NetBox mappings +# --------------------------------------------------------------------------- + +ZABBIX_STATUS_MAP = { + 0: "active", # Monitored + 1: "offline", # Unmonitored +} + +ZABBIX_IFACE_TYPE = { + 1: "agent", # Zabbix agent + 2: "snmp", # SNMP + 3: "ipmi", # IPMI + 4: "jmx", # JMX +} + +# Best-effort OS → Platform mapping from Zabbix template groups +OS_KEYWORDS_TO_PLATFORM = { + "linux": "Linux", + "windows": "Windows", + "cisco": "Cisco IOS", + "juniper": "Juniper Junos", + "freebsd": "FreeBSD", + "vmware": "VMware ESXi", + "proxmox": "Proxmox VE", + "ubuntu": "Ubuntu", + "debian": "Debian", + "centos": "CentOS", + "rhel": "Red Hat Enterprise Linux", +} + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + + +def load_dotenv(path: str = ".env") -> None: + if not os.path.isfile(path): + return + with open(path) as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + os.environ.setdefault(key.strip(), val.strip().strip("\"'")) + + +def get_config() -> dict: + return { + "url": os.environ.get("ZABBIX_URL", ""), + "user": os.environ.get("ZABBIX_USER", "Admin"), + "password": os.environ.get("ZABBIX_PASSWORD", ""), + "api_token": os.environ.get("ZABBIX_API_TOKEN", ""), + "site": os.environ.get("ZABBIX_SITE", "main"), + "default_role": os.environ.get("ZABBIX_DEFAULT_ROLE", "Server"), + "group_to_role": {}, # Could be loaded from config + } + + +# --------------------------------------------------------------------------- +# Device reference helper +# --------------------------------------------------------------------------- + + +def _device_ref(name: str, model: str, manufacturer: str, role: str, + site_name: str) -> Device: + return Device( + name=name, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=manufacturer), + ), + role=DeviceRole(name=role), + site=Site(name=site_name), + ) + + +# --------------------------------------------------------------------------- +# Data collection +# --------------------------------------------------------------------------- + + +def connect_zabbix(cfg: dict) -> ZabbixAPI: + url = cfg["url"] + if not url: + log.error("ZABBIX_URL not set") + sys.exit(1) + + zapi = ZabbixAPI(url) + zapi.session.verify = False + + if cfg.get("api_token"): + zapi.login(api_token=cfg["api_token"]) + else: + zapi.login(cfg["user"], cfg["password"]) + + log.info("Connected to Zabbix %s", zapi.api_version()) + return zapi + + +def collect_hosts(zapi: ZabbixAPI) -> list[dict]: + """Fetch all hosts with their interfaces and inventory data.""" + hosts = zapi.host.get( + output=["hostid", "host", "name", "status", "description"], + selectInterfaces=["interfaceid", "ip", "dns", "port", "type", "main", "useip"], + selectInventory="extend", + selectGroups=["name"], + selectParentTemplates=["name"], + ) + log.info("Fetched %d hosts from Zabbix", len(hosts)) + return hosts + + +# --------------------------------------------------------------------------- +# Entity builders +# --------------------------------------------------------------------------- + + +def guess_platform(host_data: dict) -> str | None: + """Try to guess platform from Zabbix templates/groups/inventory.""" + # Check inventory OS field + inventory = host_data.get("inventory") or {} + if isinstance(inventory, dict): + os_full = inventory.get("os_full") or inventory.get("os_short") or "" + for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items(): + if keyword.lower() in os_full.lower(): + return platform + + # Check template names + templates = host_data.get("parentTemplates") or [] + for tmpl in templates: + tmpl_name = (tmpl.get("name") or "").lower() + for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items(): + if keyword in tmpl_name: + return platform + + return None + + +def guess_role(host_data: dict, default_role: str) -> str: + """Try to guess device role from Zabbix groups.""" + groups = host_data.get("groups") or [] + for group in groups: + gname = (group.get("name") or "").lower() + if "router" in gname: + return "Router" + if "switch" in gname: + return "Switch" + if "firewall" in gname: + return "Firewall" + if "hypervisor" in gname: + return "Hypervisor" + if "server" in gname or "linux" in gname or "windows" in gname: + return "Server" + return default_role + + +def build_host_entities(host_data: dict, cfg: dict) -> list[Entity]: + """Build Device + Interface + IPAddress entities from a Zabbix host.""" + entities = [] + site_name = cfg["site"] + default_role = cfg["default_role"] + + hostname = host_data.get("name") or host_data.get("host", "unknown") + status = ZABBIX_STATUS_MAP.get(int(host_data.get("status", 0)), "active") + host_id = host_data.get("hostid", "") + + # Inventory data + inventory = host_data.get("inventory") or {} + if isinstance(inventory, list): + inventory = {} + serial = inventory.get("serialno_a") or "" + model = inventory.get("model") or inventory.get("hardware") or "Unknown" + vendor = inventory.get("vendor") or inventory.get("hardware_full") or "Unknown" + asset_tag = inventory.get("asset_tag") or "" + + role = guess_role(host_data, default_role) + platform = guess_platform(host_data) + + # Custom fields for cross-referencing + custom_fields = {} + if host_id: + custom_fields["zabbix_host_id"] = CustomFieldValue(text=str(host_id)) + + # Device entity + device_kwargs = dict( + name=hostname, + device_type=DeviceType( + model=model, + manufacturer=Manufacturer(name=vendor), + ), + role=DeviceRole(name=role), + site=Site(name=site_name), + status=status, + serial=serial[:50] if serial else "", + asset_tag=asset_tag[:50] if asset_tag else "", + tags=["zabbix"], + ) + if platform: + device_kwargs["platform"] = Platform(name=platform) + if custom_fields: + device_kwargs["custom_fields"] = custom_fields + + entities.append(Entity(device=Device(**device_kwargs))) + + # Interface + IP entities from Zabbix interfaces + dev_ref = _device_ref(hostname, model, vendor, role, site_name) + primary_ip = None + + for iface in host_data.get("interfaces", []): + iface_type_num = int(iface.get("type", 1)) + iface_type_name = ZABBIX_IFACE_TYPE.get(iface_type_num, "agent") + ip = iface.get("ip", "") + is_main = str(iface.get("main", "0")) == "1" + + if not ip or ip == "0.0.0.0": + continue + + # Interface name based on type + iface_name = f"zabbix-{iface_type_name}" + if is_main and iface_type_name == "agent": + iface_name = "mgmt0" + elif iface_type_name == "snmp": + iface_name = "snmp0" + elif iface_type_name == "ipmi": + iface_name = "ipmi0" + + entities.append(Entity(interface=Interface( + device=dev_ref, + name=iface_name, + type="other", + enabled=True, + tags=["zabbix"], + ))) + + # IP address + ip_str = f"{ip}/32" # Zabbix doesn't provide prefix length + entities.append(Entity(ip_address=IPAddress( + address=ip_str, + status="active", + assigned_object_interface=Interface( + device=dev_ref, + name=iface_name, + type="other", + ), + tags=["zabbix"], + ))) + + if is_main and primary_ip is None: + primary_ip = ip_str + + return entities + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def collect_all_entities(cfg: dict) -> list[Entity]: + zapi = connect_zabbix(cfg) + hosts = collect_hosts(zapi) + + entities: list[Entity] = [] + for host in hosts: + try: + entities.extend(build_host_entities(host, cfg)) + except Exception as exc: + hostname = host.get("name") or host.get("host", "?") + log.error("Failed to process Zabbix host %s: %s", hostname, exc) + + return entities + + +def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: + if not entities: + log.warning("No entities to ingest") + return + + target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") + client_id = os.environ.get("DIODE_CLIENT_ID", + os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) + client_secret = os.environ.get("DIODE_CLIENT_SECRET", + os.environ.get("INGESTER_CLIENT_SECRET", "")) + + if dry_run: + log.info("DRY RUN: %d entities would be ingested", len(entities)) + for i, e in enumerate(entities): + log.info(" [%d] %s", i, e) + return + + if not client_secret: + log.error("DIODE_CLIENT_SECRET not set — cannot ingest") + sys.exit(1) + + log.info("Ingesting %d entities to %s ...", len(entities), target) + + from netboxlabs.diode.sdk.ingester import create_message_chunks + + with DiodeClient( + target=target, + client_id=client_id, + client_secret=client_secret, + app_name="zabbix-collector", + app_version="0.1.0", + ) as client: + chunks = create_message_chunks(entities) + for idx, chunk in enumerate(chunks): + resp = client.ingest(entities=chunk) + if resp.errors: + log.error("Chunk %d errors: %s", idx, resp.errors) + else: + log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + + +def main(): + parser = argparse.ArgumentParser(description="Zabbix collector for NetBox") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"]) + parser.add_argument("--env-file", default=".env") + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + + load_dotenv(args.env_file) + cfg = get_config() + + entities = collect_all_entities(cfg) + log.info("Total entities: %d", len(entities)) + + ingest_entities(entities, dry_run=args.dry_run) + log.info("Done!") + + +if __name__ == "__main__": + main()