Add network, CML, Zabbix, Observium, VMware, and Docker collectors

Six new collectors for ingesting infrastructure data into NetBox via
the Diode SDK pipeline:

- network_collector: Cisco/Brocade devices via NAPALM + pyATS/Genie
  with LLDP/CDP cable discovery, VLANs, VRFs, prefixes, device configs,
  inventory items, and BGP push to netbox-bgp plugin API
- cml_collector: Cisco Modeling Labs topology sync (nodes, links, configs)
- zabbix_collector: Brownfield import from Zabbix API with cross-ref
  custom fields
- observium_collector: Device/port/IP import from Observium REST API
- vmware_collector: vCenter/ESXi hosts, VMs, interfaces, disks, IPs
- docker_collector: Container discovery via Docker API (tested: 21
  containers found on local host)

Also adds inventory.yaml.example template for network device credentials.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
sam 2026-02-28 03:17:40 -07:00
parent a5b37c0dd5
commit b4fcdfa277
8 changed files with 3210 additions and 0 deletions

0
collectors/__init__.py Normal file
View File

460
collectors/cml_collector.py Normal file
View File

@ -0,0 +1,460 @@
#!/usr/bin/env python3
"""Cisco Modeling Labs collector for NetBox via Diode SDK.
Syncs CML lab topology into NetBox: devices (nodes), interfaces, cables (links),
L3 addresses, and device configs.
Usage:
python collectors/cml_collector.py --dry-run
python collectors/cml_collector.py
"""
import argparse
import logging
import os
import re
import sys
from virl2_client import ClientLibrary
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cable,
Device,
DeviceConfig,
DeviceRole,
DeviceType,
Entity,
GenericObject,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
)
log = logging.getLogger("cml-collector")
# ---------------------------------------------------------------------------
# CML node definition → NetBox mappings
# ---------------------------------------------------------------------------
NODE_DEF_TO_PLATFORM = {
"iosv": "Cisco IOS",
"iosvl2": "Cisco IOS",
"iosxrv": "Cisco IOS-XR",
"iosxrv9000": "Cisco IOS-XR",
"csr1000v": "Cisco IOS-XE",
"cat8000v": "Cisco IOS-XE",
"cat9000v": "Cisco IOS-XE",
"nxosv": "Cisco NX-OS",
"nxosv9000": "Cisco NX-OS",
"asav": "Cisco ASA",
"server": "Linux",
"ubuntu": "Ubuntu",
"alpine": "Alpine Linux",
"desktop": "Linux",
"trex": "TRex Traffic Generator",
"wan_emulator": "Linux",
"external_connector": "External Connector",
"unmanaged_switch": "Unmanaged Switch",
}
NODE_DEF_TO_MANUFACTURER = {
"iosv": "Cisco",
"iosvl2": "Cisco",
"iosxrv": "Cisco",
"iosxrv9000": "Cisco",
"csr1000v": "Cisco",
"cat8000v": "Cisco",
"cat9000v": "Cisco",
"nxosv": "Cisco",
"nxosv9000": "Cisco",
"asav": "Cisco",
}
NODE_DEF_TO_ROLE = {
"iosv": "Router",
"iosvl2": "Switch",
"iosxrv": "Router",
"iosxrv9000": "Router",
"csr1000v": "Router",
"cat8000v": "Router",
"cat9000v": "Switch",
"nxosv": "Switch",
"nxosv9000": "Switch",
"asav": "Firewall",
"server": "Server",
"ubuntu": "Server",
"alpine": "Server",
"desktop": "Server",
"external_connector": "Patch Panel",
"unmanaged_switch": "Switch",
}
NODE_DEF_TO_MODEL = {
"iosv": "IOSv",
"iosvl2": "IOSvL2",
"iosxrv": "IOS-XRv",
"iosxrv9000": "IOS-XRv 9000",
"csr1000v": "CSR1000v",
"cat8000v": "Catalyst 8000V",
"cat9000v": "Catalyst 9000V",
"nxosv": "NX-OSv",
"nxosv9000": "NX-OSv 9000",
"asav": "ASAv",
}
CML_STATE_TO_STATUS = {
"BOOTED": "active",
"STARTED": "active",
"STOPPED": "offline",
"DEFINED_ON_CORE": "planned",
"QUEUED": "planned",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"host": os.environ.get("CML_HOST", ""),
"user": os.environ.get("CML_USER", "admin"),
"password": os.environ.get("CML_PASSWORD", ""),
"lab": os.environ.get("CML_LAB", ""),
"verify_ssl": os.environ.get("CML_VERIFY_SSL", "false").lower() == "true",
"site": os.environ.get("CML_SITE", "CML"),
}
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, node_def: str, site_name: str) -> Device:
model = NODE_DEF_TO_MODEL.get(node_def, node_def)
manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic")
role = NODE_DEF_TO_ROLE.get(node_def, "Network Device")
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_node_entity(node, site_name: str) -> Entity:
node_def = node.node_definition or "unknown"
model = NODE_DEF_TO_MODEL.get(node_def, node_def)
manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic")
role = NODE_DEF_TO_ROLE.get(node_def, "Network Device")
platform = NODE_DEF_TO_PLATFORM.get(node_def, node_def)
status = CML_STATE_TO_STATUS.get(node.state, "planned")
return Entity(device=Device(
name=node.label,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
platform=Platform(name=platform),
site=Site(name=site_name),
status=status,
tags=["cml"],
))
def build_interface_entities(node, site_name: str) -> list[Entity]:
entities = []
node_def = node.node_definition or "unknown"
for iface in node.interfaces():
iface_name = iface.label or f"iface{iface.slot}"
# Map interface type from name
iface_type = "virtual"
if re.match(r"^(Gi|GigabitEthernet)", iface_name, re.IGNORECASE):
iface_type = "1000base-t"
elif re.match(r"^(Te|TenGig)", iface_name, re.IGNORECASE):
iface_type = "10gbase-x-sfpp"
elif re.match(r"^(Fa|FastEthernet)", iface_name, re.IGNORECASE):
iface_type = "100base-tx"
elif re.match(r"^(Lo|Loopback)", iface_name, re.IGNORECASE):
iface_type = "virtual"
elif re.match(r"^(eth|ens|enp)", iface_name, re.IGNORECASE):
iface_type = "1000base-t"
entities.append(Entity(interface=Interface(
device=_device_ref(node.label, node_def, site_name),
name=iface_name,
type=iface_type,
enabled=node.state in ("BOOTED", "STARTED"),
tags=["cml"],
)))
return entities
def build_ip_entities(node, site_name: str) -> list[Entity]:
"""Build IP entities from CML node L3 addresses."""
entities = []
node_def = node.node_definition or "unknown"
for iface in node.interfaces():
iface_name = iface.label or f"iface{iface.slot}"
# CML may expose discovered L3 addresses
try:
if hasattr(iface, "discovered_ipv4") and iface.discovered_ipv4:
for addr in iface.discovered_ipv4:
if addr and not addr.startswith("127."):
ip_str = addr if "/" in addr else f"{addr}/24"
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=_device_ref(node.label, node_def, site_name),
name=iface_name,
type="virtual",
),
tags=["cml"],
)))
if hasattr(iface, "discovered_ipv6") and iface.discovered_ipv6:
for addr in iface.discovered_ipv6:
if addr and not addr.lower().startswith("fe80"):
ip_str = addr if "/" in addr else f"{addr}/64"
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=_device_ref(node.label, node_def, site_name),
name=iface_name,
type="virtual",
),
tags=["cml"],
)))
except Exception as exc:
log.debug(" IP discovery unavailable for %s:%s: %s",
node.label, iface_name, exc)
return entities
def build_cable_entities(lab, site_name: str,
node_map: dict[str, str]) -> list[Entity]:
"""Build Cable entities from CML links.
node_map: {node_id: (node_label, node_definition)}
"""
entities = []
for link in lab.links():
try:
iface_a = link.interface_a
iface_b = link.interface_b
node_a = iface_a.node
node_b = iface_b.node
a_name = iface_a.label or f"iface{iface_a.slot}"
b_name = iface_b.label or f"iface{iface_b.slot}"
a_node_def = node_a.node_definition or "unknown"
b_node_def = node_b.node_definition or "unknown"
cable = Cable(
a_terminations=[GenericObject(object_interface=Interface(
device=_device_ref(node_a.label, a_node_def, site_name),
name=a_name,
type="virtual",
))],
b_terminations=[GenericObject(object_interface=Interface(
device=_device_ref(node_b.label, b_node_def, site_name),
name=b_name,
type="virtual",
))],
status="connected",
tags=["cml"],
)
entities.append(Entity(cable=cable))
log.info(" Cable: %s:%s <-> %s:%s",
node_a.label, a_name, node_b.label, b_name)
except Exception as exc:
log.warning(" Failed to build cable for link: %s", exc)
return entities
def build_config_entity(node, site_name: str) -> Entity | None:
"""Build DeviceConfig entity from CML node configuration."""
node_def = node.node_definition or "unknown"
try:
config = node.config
if not config:
return None
return Entity(
device=_device_ref(node.label, node_def, site_name),
device_config=DeviceConfig(
startup=config.encode("utf-8") if config else None,
),
)
except Exception as exc:
log.debug(" Config unavailable for %s: %s", node.label, exc)
return None
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
host = cfg["host"]
if not host:
log.error("CML_HOST not set")
sys.exit(1)
url = f"https://{host}" if not host.startswith("http") else host
log.info("Connecting to CML at %s...", url)
client = ClientLibrary(url, cfg["user"], cfg["password"],
ssl_verify=cfg["verify_ssl"])
site_name = cfg["site"]
entities: list[Entity] = []
# Get labs
labs = client.all_labs()
target_lab = cfg.get("lab")
if target_lab:
labs = [l for l in labs if l.title == target_lab or l.id == target_lab]
if not labs:
log.error("Lab '%s' not found", target_lab)
sys.exit(1)
for lab in labs:
lab.sync()
log.info("Lab: %s (%s) — %d nodes, %d links",
lab.title, lab.state(), len(lab.nodes()), len(lab.links()))
node_map = {}
for node in lab.nodes():
node_def = node.node_definition or "unknown"
node_map[node.id] = (node.label, node_def)
# Skip external connectors and unmanaged switches for device creation
if node_def in ("external_connector", "unmanaged_switch"):
log.debug(" Skipping non-device node: %s (%s)", node.label, node_def)
# Still create interface entities for cable termination
entities.extend(build_interface_entities(node, site_name))
continue
# Device
entities.append(build_node_entity(node, site_name))
# Interfaces
entities.extend(build_interface_entities(node, site_name))
# IPs
entities.extend(build_ip_entities(node, site_name))
# Config
config_entity = build_config_entity(node, site_name)
if config_entity:
entities.append(config_entity)
# Cables from links
entities.extend(build_cable_entities(lab, site_name, node_map))
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
from netboxlabs.diode.sdk.ingester import create_message_chunks
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="cml-collector",
app_version="0.1.0",
) as client:
chunks = create_message_chunks(entities)
for idx, chunk in enumerate(chunks):
resp = client.ingest(entities=chunk)
if resp.errors:
log.error("Chunk %d errors: %s", idx, resp.errors)
else:
log.info("Chunk %d: %d entities ingested", idx, len(chunk))
def main():
parser = argparse.ArgumentParser(description="CML topology collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,362 @@
#!/usr/bin/env python3
"""Docker container collector for NetBox via Diode SDK.
Discovers Docker containers across one or more hosts and ingests them
into NetBox as VirtualMachines with VMInterfaces and IPAddresses.
Usage:
python collectors/docker_collector.py --dry-run
python collectors/docker_collector.py
"""
import argparse
import logging
import os
import sys
import docker
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cluster,
ClusterType,
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
IPAddress,
Manufacturer,
Platform,
Site,
VirtualMachine,
VMInterface,
)
log = logging.getLogger("docker-collector")
# ---------------------------------------------------------------------------
# Status mappings
# ---------------------------------------------------------------------------
CONTAINER_STATUS_MAP = {
"running": "active",
"paused": "active",
"restarting": "active",
"created": "planned",
"exited": "offline",
"dead": "failed",
"removing": "decommissioning",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
# Support multiple Docker hosts (comma-separated)
hosts_str = os.environ.get("DOCKER_HOSTS", "")
if hosts_str:
hosts = [h.strip() for h in hosts_str.split(",") if h.strip()]
else:
hosts = ["local"] # Use local Docker socket
return {
"hosts": hosts,
"site": os.environ.get("DOCKER_SITE", "main"),
"tls_verify": os.environ.get("DOCKER_TLS_VERIFY", "false").lower() == "true",
}
# ---------------------------------------------------------------------------
# VM reference helper
# ---------------------------------------------------------------------------
def _vm_ref(name: str, cluster_name: str, site_name: str) -> VirtualMachine:
return VirtualMachine(
name=name,
site=Site(name=site_name),
cluster=Cluster(
name=cluster_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Docker Container"),
)
# ---------------------------------------------------------------------------
# Data collection
# ---------------------------------------------------------------------------
def connect_docker(host: str, tls_verify: bool = False) -> docker.DockerClient:
"""Connect to a Docker host."""
if host == "local":
return docker.from_env()
else:
tls_config = None
if host.startswith("https://") and tls_verify:
tls_config = docker.tls.TLSConfig(verify=True)
return docker.DockerClient(base_url=host, tls=tls_config)
def get_host_info(client: docker.DockerClient) -> dict:
"""Get Docker host system info."""
return client.info()
def get_containers(client: docker.DockerClient, all_containers: bool = True) -> list:
"""Get all containers from Docker host."""
return client.containers.list(all=all_containers)
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_cluster_entity(host_name: str, site_name: str) -> Entity:
"""Build a Cluster entity for the Docker host."""
return Entity(cluster=Cluster(
name=host_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
status="active",
tags=["docker"],
))
def build_container_entities(container, host_name: str,
site_name: str) -> list[Entity]:
"""Build VirtualMachine + VMInterface + IPAddress entities for a container."""
entities = []
# Container info
name = container.name
status = CONTAINER_STATUS_MAP.get(container.status, "active")
image = container.image.tags[0] if container.image.tags else str(container.image.id)[:20]
short_id = container.short_id
# Labels/env for metadata
labels = container.labels or {}
compose_project = labels.get("com.docker.compose.project", "")
compose_service = labels.get("com.docker.compose.service", "")
# Custom fields
custom_fields = {
"docker_container_id": CustomFieldValue(text=short_id),
}
if compose_project:
custom_fields["docker_compose_project"] = CustomFieldValue(text=compose_project)
# VirtualMachine entity
vm_kwargs = dict(
name=name,
status=status,
site=Site(name=site_name),
cluster=Cluster(
name=host_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Docker Container"),
platform=Platform(name="Docker"),
comments=f"Image: {image}",
tags=["docker"],
custom_fields=custom_fields,
)
entities.append(Entity(virtual_machine=VirtualMachine(**vm_kwargs)))
# Network interfaces and IPs
vm_ref = _vm_ref(name, host_name, site_name)
try:
# container.attrs has full inspect data
networks = container.attrs.get("NetworkSettings", {}).get("Networks", {})
for net_name, net_data in networks.items():
ip = net_data.get("IPAddress", "")
mac = net_data.get("MacAddress", "")
gateway = net_data.get("Gateway", "")
prefix_len = net_data.get("IPPrefixLen", 0)
ipv6 = net_data.get("GlobalIPv6Address", "")
ipv6_prefix = net_data.get("GlobalIPv6PrefixLen", 0)
# VMInterface
iface_name = net_name[:64]
iface_kwargs = dict(
virtual_machine=vm_ref,
name=iface_name,
enabled=True,
tags=["docker"],
)
if mac:
iface_kwargs["mac_address"] = mac
entities.append(Entity(vm_interface=VMInterface(**iface_kwargs)))
# IPv4 address
if ip and ip != "0.0.0.0":
ip_str = f"{ip}/{prefix_len}" if prefix_len else f"{ip}/24"
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=iface_name,
),
tags=["docker"],
)))
# IPv6 address
if ipv6:
ipv6_str = f"{ipv6}/{ipv6_prefix}" if ipv6_prefix else f"{ipv6}/64"
entities.append(Entity(ip_address=IPAddress(
address=ipv6_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=iface_name,
),
tags=["docker"],
)))
except Exception as exc:
log.warning(" Network info unavailable for %s: %s", name, exc)
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
site_name = cfg["site"]
entities: list[Entity] = []
for host in cfg["hosts"]:
log.info("Connecting to Docker host: %s", host)
try:
client = connect_docker(host, cfg["tls_verify"])
except Exception as exc:
log.error("Failed to connect to Docker host %s: %s", host, exc)
continue
# Get host info for cluster name
try:
info = get_host_info(client)
host_name = info.get("Name", host)
log.info(" Host: %s (Docker %s, %d containers)",
host_name, info.get("ServerVersion", "?"),
info.get("Containers", 0))
except Exception as exc:
log.warning(" Cannot get host info: %s", exc)
host_name = host
# Cluster entity
entities.append(build_cluster_entity(host_name, site_name))
# Container entities
try:
containers = get_containers(client)
log.info(" Found %d containers", len(containers))
for container in containers:
try:
entities.extend(build_container_entities(
container, host_name, site_name
))
except Exception as exc:
log.error(" Failed to process container %s: %s",
container.name, exc)
except Exception as exc:
log.error(" Failed to list containers on %s: %s", host_name, exc)
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
from netboxlabs.diode.sdk.ingester import create_message_chunks
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="docker-collector",
app_version="0.1.0",
) as client:
chunks = create_message_chunks(entities)
for idx, chunk in enumerate(chunks):
resp = client.ingest(entities=chunk)
if resp.errors:
log.error("Chunk %d errors: %s", idx, resp.errors)
else:
log.info("Chunk %d: %d entities ingested", idx, len(chunk))
def main():
parser = argparse.ArgumentParser(description="Docker container collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--all", action="store_true",
help="Include stopped containers (default: running only)")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,36 @@
# Network Device Inventory for network_collector.py
#
# Copy this file to inventory.yaml and fill in your device details.
# Fields in 'defaults' apply to all devices unless overridden per-device.
defaults:
site: main
role: Network Device
username: admin
password: cisco
secret: cisco # enable secret (if required)
driver: ios # NAPALM driver: ios, iosxr, eos, junos, nxos
timeout: 60
devices:
# Cisco IOS routers
- host: 10.10.20.1
driver: ios
role: Router
# Cisco IOS switches
- host: 10.10.20.55
driver: ios
role: Switch
# Cisco IOS-XR devices
# - host: 10.10.20.100
# driver: iosxr
# role: Router
# Brocade switches (requires napalm-ruckus-fastiron or use netmiko fallback)
# - host: 10.10.20.200
# driver: ros # or use custom driver name
# role: Switch
# optional_args:
# transport: ssh

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,382 @@
#!/usr/bin/env python3
"""Observium collector for NetBox via Diode SDK.
Pulls device, port, and IP data from the Observium REST API for brownfield
import into NetBox. Adds Observium device IDs as custom fields for cross-referencing.
Note: Observium API requires Professional or Enterprise edition.
Community Edition users can skip this collector.
Usage:
python collectors/observium_collector.py --dry-run
python collectors/observium_collector.py
"""
import argparse
import logging
import os
import sys
import requests
from requests.auth import HTTPBasicAuth
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
VLAN,
)
log = logging.getLogger("observium-collector")
# ---------------------------------------------------------------------------
# Observium → NetBox mappings
# ---------------------------------------------------------------------------
OBSERVIUM_STATUS_MAP = {
"1": "active", # up
"0": "offline", # down
}
IF_TYPE_MAP = {
"ethernetCsmacd": "1000base-t",
"gigabitEthernet": "1000base-t",
"fastEthernet": "100base-tx",
"propVirtual": "virtual",
"l3ipvlan": "virtual",
"tunnel": "virtual",
"softwareLoopback": "virtual",
"ieee8023adLag": "lag",
"bridge": "bridge",
"other": "other",
}
OS_TO_PLATFORM = {
"ios": "Cisco IOS",
"iosxe": "Cisco IOS-XE",
"iosxr": "Cisco IOS-XR",
"nxos": "Cisco NX-OS",
"junos": "Juniper Junos",
"linux": "Linux",
"vmware": "VMware ESXi",
"ironware": "Brocade IronWare",
"fastiron": "Brocade FastIron",
"windows": "Windows",
"freebsd": "FreeBSD",
"proxmox": "Proxmox VE",
}
OS_TO_MANUFACTURER = {
"ios": "Cisco",
"iosxe": "Cisco",
"iosxr": "Cisco",
"nxos": "Cisco",
"junos": "Juniper",
"ironware": "Brocade",
"fastiron": "Brocade",
}
OS_TO_ROLE = {
"ios": "Router",
"iosxe": "Router",
"iosxr": "Router",
"nxos": "Switch",
"junos": "Router",
"ironware": "Switch",
"fastiron": "Switch",
"linux": "Server",
"vmware": "Hypervisor",
"windows": "Server",
"proxmox": "Hypervisor",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"url": os.environ.get("OBSERVIUM_URL", ""),
"user": os.environ.get("OBSERVIUM_USER", "admin"),
"password": os.environ.get("OBSERVIUM_PASSWORD", ""),
"site": os.environ.get("OBSERVIUM_SITE", "main"),
"default_role": os.environ.get("OBSERVIUM_DEFAULT_ROLE", "Network Device"),
}
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def api_get(base_url: str, endpoint: str, auth: HTTPBasicAuth,
params: dict | None = None) -> dict:
"""Make a GET request to the Observium API."""
url = f"{base_url}/{endpoint}"
resp = requests.get(url, auth=auth, params=params, timeout=30, verify=False)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, manufacturer: str, role: str,
site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Data collection and entity building
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
base_url = cfg["url"].rstrip("/")
if not base_url:
log.error("OBSERVIUM_URL not set")
sys.exit(1)
auth = HTTPBasicAuth(cfg["user"], cfg["password"])
site_name = cfg["site"]
default_role = cfg["default_role"]
entities: list[Entity] = []
# --- Devices ---
log.info("Fetching devices from Observium...")
try:
devices_resp = api_get(base_url, "devices", auth)
except Exception as exc:
log.error("Failed to fetch devices: %s", exc)
sys.exit(1)
devices = devices_resp.get("devices", {})
if isinstance(devices, list):
devices = {str(i): d for i, d in enumerate(devices)}
log.info("Found %d devices", len(devices))
device_id_to_name = {}
for dev_id, dev_data in devices.items():
hostname = dev_data.get("hostname") or dev_data.get("sysName") or f"device-{dev_id}"
device_id_to_name[dev_id] = hostname
os_type = dev_data.get("os") or ""
model = dev_data.get("hardware") or "Unknown"
vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown")
serial = dev_data.get("serial") or ""
status = "active" if dev_data.get("status") == "1" else "offline"
role = OS_TO_ROLE.get(os_type, default_role)
platform = OS_TO_PLATFORM.get(os_type)
custom_fields = {
"observium_device_id": CustomFieldValue(text=str(dev_id)),
}
device_kwargs = dict(
name=hostname,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=vendor),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
serial=serial[:50] if serial else "",
status=status,
tags=["observium"],
custom_fields=custom_fields,
)
if platform:
device_kwargs["platform"] = Platform(name=platform)
entities.append(Entity(device=Device(**device_kwargs)))
# --- Ports (interfaces) ---
log.info("Fetching ports from Observium...")
try:
ports_resp = api_get(base_url, "ports", auth)
ports = ports_resp.get("ports", {})
if isinstance(ports, list):
ports = {str(i): p for i, p in enumerate(ports)}
log.info("Found %d ports", len(ports))
except Exception as exc:
log.warning("Failed to fetch ports: %s", exc)
ports = {}
port_id_to_info = {}
for port_id, port_data in ports.items():
dev_id = str(port_data.get("device_id", ""))
hostname = device_id_to_name.get(dev_id)
if not hostname:
continue
iface_name = port_data.get("ifName") or port_data.get("port_label") or f"port{port_id}"
if_type = port_data.get("ifType") or "other"
mac = port_data.get("ifPhysAddress") or ""
speed = int(port_data.get("ifSpeed", 0) or 0) // 1000000 # bps → Mbps
mtu = int(port_data.get("ifMtu", 0) or 0)
description = port_data.get("ifAlias") or ""
enabled = port_data.get("ifAdminStatus") == "up"
iface_type = IF_TYPE_MAP.get(if_type, "other")
if iface_type == "other" and speed:
from collectors.network_collector import SPEED_TO_TYPE
iface_type = SPEED_TO_TYPE.get(speed, "other")
# Look up device model for reference
dev_data = devices.get(dev_id, {})
os_type = dev_data.get("os") or ""
model = dev_data.get("hardware") or "Unknown"
vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown")
role = OS_TO_ROLE.get(os_type, default_role)
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
entities.append(Entity(interface=Interface(
device=dev_ref,
name=iface_name[:64],
type=iface_type,
mac_address=mac,
mtu=mtu,
speed=speed * 1000 if speed else 0, # Mbps → Kbps
enabled=enabled,
description=description[:200] if description else "",
tags=["observium"],
)))
port_id_to_info[port_id] = (hostname, iface_name, iface_type, model, vendor, role)
# --- IP addresses ---
log.info("Fetching IP addresses from Observium...")
try:
# Observium may expose IPs through the ports/addresses endpoint
for port_id, (hostname, iface_name, iface_type, model, vendor, role) in port_id_to_info.items():
try:
addr_resp = api_get(base_url, f"ports/{port_id}/ip", auth)
addresses = addr_resp.get("addresses", {})
if isinstance(addresses, list):
addresses = {str(i): a for i, a in enumerate(addresses)}
for addr_id, addr_data in addresses.items():
ip = addr_data.get("ipv4_address") or addr_data.get("ipv6_address") or ""
prefix_len = addr_data.get("ipv4_prefixlen") or addr_data.get("ipv6_prefixlen") or ""
if ip and prefix_len:
ip_str = f"{ip}/{prefix_len}"
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name=iface_name,
type=iface_type,
),
tags=["observium"],
)))
except Exception:
pass # IP endpoint may not be available for all ports
except Exception as exc:
log.warning("IP address collection failed: %s", exc)
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
from netboxlabs.diode.sdk.ingester import create_message_chunks
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="observium-collector",
app_version="0.1.0",
) as client:
chunks = create_message_chunks(entities)
for idx, chunk in enumerate(chunks):
resp = client.ingest(entities=chunk)
if resp.errors:
log.error("Chunk %d errors: %s", idx, resp.errors)
else:
log.info("Chunk %d: %d entities ingested", idx, len(chunk))
def main():
parser = argparse.ArgumentParser(description="Observium collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,548 @@
#!/usr/bin/env python3
"""VMware vSphere collector for NetBox via Diode SDK.
Discovers ESXi hosts, VMs, interfaces, IPs, and disks from a vCenter
or standalone ESXi host and ingests them into NetBox via the Diode pipeline.
Usage:
python collectors/vmware_collector.py --dry-run
python collectors/vmware_collector.py
"""
import argparse
import atexit
import logging
import os
import re
import ssl
import sys
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim, vmodl
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cluster,
ClusterGroup,
ClusterType,
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
VirtualDisk,
VirtualMachine,
VMInterface,
)
log = logging.getLogger("vmware-collector")
# ---------------------------------------------------------------------------
# Status mappings
# ---------------------------------------------------------------------------
VM_POWER_STATE_MAP = {
"poweredOn": "active",
"poweredOff": "offline",
"suspended": "offline",
}
HOST_STATUS_MAP = {
"green": "active",
"yellow": "active",
"red": "failed",
"gray": "planned",
}
SPEED_TO_TYPE = {
100: "100base-tx",
1000: "1000base-t",
2500: "2.5gbase-t",
10000: "10gbase-x-sfpp",
25000: "25gbase-x-sfp28",
40000: "40gbase-x-qsfpp",
100000: "100gbase-x-qsfp28",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"host": os.environ.get("VCENTER_HOST", ""),
"user": os.environ.get("VCENTER_USER", "administrator@vsphere.local"),
"password": os.environ.get("VCENTER_PASSWORD", ""),
"port": int(os.environ.get("VCENTER_PORT", "443")),
"verify_ssl": os.environ.get("VCENTER_VERIFY_SSL", "false").lower() == "true",
"site": os.environ.get("VCENTER_SITE", "main"),
}
# ---------------------------------------------------------------------------
# Reference helpers
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, manufacturer: str, role: str,
site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
def _vm_ref(name: str, cluster_name: str, site_name: str,
role: str = "Virtual Machine") -> VirtualMachine:
return VirtualMachine(
name=name,
site=Site(name=site_name),
cluster=Cluster(
name=cluster_name,
type=ClusterType(name="VMware ESXi"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name=role),
)
# ---------------------------------------------------------------------------
# vSphere connection
# ---------------------------------------------------------------------------
def connect_vsphere(cfg: dict):
"""Connect to vCenter/ESXi and return ServiceInstance."""
host = cfg["host"]
if not host:
log.error("VCENTER_HOST not set")
sys.exit(1)
context = None
if not cfg["verify_ssl"]:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
si = SmartConnect(
host=host,
user=cfg["user"],
pwd=cfg["password"],
port=cfg["port"],
sslContext=context,
)
atexit.register(Disconnect, si)
log.info("Connected to vSphere: %s", host)
return si
def get_all_objects(si, obj_type, folder=None):
"""Get all managed objects of a given type."""
content = si.RetrieveContent()
container = content.viewManager.CreateContainerView(
folder or content.rootFolder, [obj_type], True
)
objects = list(container.view)
container.Destroy()
return objects
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_cluster_entities(si, site_name: str) -> list[Entity]:
"""Build Cluster entities from vSphere clusters."""
entities = []
clusters = get_all_objects(si, vim.ClusterComputeResource)
for cluster in clusters:
dc_name = ""
parent = cluster.parent
while parent:
if isinstance(parent, vim.Datacenter):
dc_name = parent.name
break
parent = getattr(parent, "parent", None)
entities.append(Entity(cluster=Cluster(
name=cluster.name,
type=ClusterType(name="VMware ESXi"),
scope_site=Site(name=site_name),
group=ClusterGroup(name=dc_name) if dc_name else None,
status="active",
tags=["vmware"],
)))
log.info(" Cluster: %s (DC: %s)", cluster.name, dc_name or "none")
return entities
def build_host_entities(si, site_name: str) -> tuple[list[Entity], dict]:
"""Build Device entities from ESXi hosts. Returns entities and host-to-cluster mapping."""
entities = []
host_cluster_map = {}
hosts = get_all_objects(si, vim.HostSystem)
for host in hosts:
hostname = host.name
hw = host.hardware
sys_info = hw.systemInfo if hw else None
model = sys_info.model if sys_info else "Unknown"
vendor = sys_info.vendor if sys_info else "Unknown"
serial = ""
if sys_info:
for ident in (sys_info.otherIdentifyingInfo or []):
if hasattr(ident, "identifierType") and \
ident.identifierType and \
ident.identifierType.key == "ServiceTag":
serial = ident.identifierValue
break
if not serial:
serial = getattr(sys_info, "serialNumber", "") or ""
status = HOST_STATUS_MAP.get(
str(host.overallStatus) if host.overallStatus else "gray",
"active"
)
# Determine cluster
cluster_name = ""
if isinstance(host.parent, vim.ClusterComputeResource):
cluster_name = host.parent.name
host_cluster_map[hostname] = cluster_name
entities.append(Entity(device=Device(
name=hostname,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=vendor),
),
role=DeviceRole(name="Hypervisor"),
platform=Platform(name="VMware ESXi"),
site=Site(name=site_name),
serial=serial[:50] if serial else "",
status=status,
tags=["vmware"],
)))
# Physical NICs
dev_ref = _device_ref(hostname, model, vendor, "Hypervisor", site_name)
if host.config and host.config.network:
for pnic in (host.config.network.pnic or []):
speed_mbps = 0
if pnic.linkSpeed:
speed_mbps = pnic.linkSpeed.speedMb
iface_type = SPEED_TO_TYPE.get(speed_mbps, "1000base-t")
entities.append(Entity(interface=Interface(
device=dev_ref,
name=pnic.device,
type=iface_type,
mac_address=pnic.mac or "",
speed=speed_mbps * 1000 if speed_mbps else 0,
enabled=True,
tags=["vmware"],
)))
# VMkernel interfaces
for vnic in (host.config.network.vnic or []):
ip_str = ""
if vnic.spec and vnic.spec.ip:
ip = vnic.spec.ip.ipAddress
mask = vnic.spec.ip.subnetMask
if ip:
prefix_len = _mask_to_prefix(mask) if mask else 24
ip_str = f"{ip}/{prefix_len}"
entities.append(Entity(interface=Interface(
device=dev_ref,
name=vnic.device,
type="virtual",
mac_address=vnic.spec.mac if vnic.spec else "",
enabled=True,
tags=["vmware", "vmkernel"],
)))
if ip_str:
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name=vnic.device,
type="virtual",
),
tags=["vmware"],
)))
log.info(" Host: %s (%s %s, cluster=%s)",
hostname, vendor, model, cluster_name or "standalone")
return entities, host_cluster_map
def build_vm_entities(si, site_name: str,
host_cluster_map: dict) -> list[Entity]:
"""Build VirtualMachine + VMInterface + VirtualDisk + IPAddress entities."""
entities = []
vms = get_all_objects(si, vim.VirtualMachine)
for vm_obj in vms:
try:
vm_name = vm_obj.name
config = vm_obj.config
if not config:
log.debug(" Skipping VM with no config: %s", vm_name)
continue
# Determine cluster from host
host_name = ""
cluster_name = ""
if vm_obj.runtime and vm_obj.runtime.host:
host_name = vm_obj.runtime.host.name
cluster_name = host_cluster_map.get(host_name, "")
if not cluster_name:
cluster_name = host_name or "standalone"
power_state = str(vm_obj.runtime.powerState) if vm_obj.runtime else "poweredOff"
status = VM_POWER_STATE_MAP.get(power_state, "offline")
# Resources
vcpus = config.hardware.numCPU if config.hardware else 0
memory_mb = config.hardware.memoryMB if config.hardware else 0
total_disk_gb = 0
# Determine platform from guest
guest_os = config.guestFullName or config.guestId or ""
platform_name = None
if "linux" in guest_os.lower() or "ubuntu" in guest_os.lower() or \
"centos" in guest_os.lower() or "debian" in guest_os.lower():
platform_name = "Linux"
elif "windows" in guest_os.lower():
platform_name = "Windows"
# Collect IPs for primary_ip4
primary_ip4 = None
vm_ips = []
# Guest NIC info (requires VMware Tools)
if vm_obj.guest and vm_obj.guest.net:
for guest_nic in vm_obj.guest.net:
if guest_nic.ipConfig:
for ip_entry in guest_nic.ipConfig.ipAddress:
addr = ip_entry.ipAddress
prefix = ip_entry.prefixLength
if addr and not addr.startswith("fe80") and \
not addr.startswith("127."):
ip_str = f"{addr}/{prefix}"
nic_name = guest_nic.network or "eth0"
vm_ips.append((ip_str, nic_name))
if not primary_ip4 and ":" not in addr:
primary_ip4 = ip_str
# VirtualMachine entity
vm_kwargs = dict(
name=vm_name,
status=status,
site=Site(name=site_name),
cluster=Cluster(
name=cluster_name,
type=ClusterType(name="VMware ESXi"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Virtual Machine"),
vcpus=vcpus,
memory=memory_mb,
comments=f"Guest: {guest_os}" if guest_os else "",
tags=["vmware"],
)
if platform_name:
vm_kwargs["platform"] = Platform(name=platform_name)
if primary_ip4:
vm_kwargs["primary_ip4"] = IPAddress(address=primary_ip4)
entities.append(Entity(virtual_machine=VirtualMachine(**vm_kwargs)))
# VM NICs
vm_ref = _vm_ref(vm_name, cluster_name, site_name)
if config.hardware and config.hardware.device:
for device in config.hardware.device:
if isinstance(device, vim.vm.device.VirtualEthernetCard):
nic_name = device.deviceInfo.label if device.deviceInfo else f"nic{device.key}"
mac = getattr(device, "macAddress", "") or ""
net_name = ""
if hasattr(device, "backing"):
backing = device.backing
if hasattr(backing, "network") and backing.network:
net_name = backing.network.name
elif hasattr(backing, "deviceName"):
net_name = backing.deviceName
entities.append(Entity(vm_interface=VMInterface(
virtual_machine=vm_ref,
name=nic_name[:64],
enabled=device.connectable.connected if device.connectable else True,
mac_address=mac,
description=net_name[:200] if net_name else "",
tags=["vmware"],
)))
# Virtual Disks
elif isinstance(device, vim.vm.device.VirtualDisk):
disk_name = device.deviceInfo.label if device.deviceInfo else f"disk{device.key}"
disk_size_gb = device.capacityInKB // (1024 * 1024) if device.capacityInKB else 0
total_disk_gb += disk_size_gb
if disk_size_gb > 0:
entities.append(Entity(virtual_disk=VirtualDisk(
virtual_machine=vm_ref,
name=disk_name[:64],
size=disk_size_gb,
tags=["vmware"],
)))
# IP entities from guest tools
for ip_str, nic_name in vm_ips:
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=nic_name[:64],
),
tags=["vmware"],
)))
log.info(" VM: %s (%s, %d vCPU, %d MB RAM, %d GB disk)",
vm_name, status, vcpus, memory_mb, total_disk_gb)
except Exception as exc:
log.error(" Failed to process VM %s: %s",
getattr(vm_obj, "name", "?"), exc)
return entities
def _mask_to_prefix(mask: str) -> int:
"""Convert subnet mask to prefix length."""
try:
return sum(bin(int(x)).count("1") for x in mask.split("."))
except (ValueError, AttributeError):
return 24
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
si = connect_vsphere(cfg)
site_name = cfg["site"]
entities: list[Entity] = []
# Clusters
entities.extend(build_cluster_entities(si, site_name))
# ESXi hosts + interfaces
host_entities, host_cluster_map = build_host_entities(si, site_name)
entities.extend(host_entities)
# VMs
entities.extend(build_vm_entities(si, site_name, host_cluster_map))
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
from netboxlabs.diode.sdk.ingester import create_message_chunks
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="vmware-collector",
app_version="0.1.0",
) as client:
chunks = create_message_chunks(entities)
for idx, chunk in enumerate(chunks):
resp = client.ingest(entities=chunk)
if resp.errors:
log.error("Chunk %d errors: %s", idx, resp.errors)
else:
log.info("Chunk %d: %d entities ingested", idx, len(chunk))
def main():
parser = argparse.ArgumentParser(description="VMware vSphere collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,376 @@
#!/usr/bin/env python3
"""Zabbix collector for NetBox via Diode SDK.
Pulls device inventory from Zabbix API for brownfield import into NetBox.
Also adds Zabbix host IDs as custom fields for cross-referencing.
Usage:
python collectors/zabbix_collector.py --dry-run
python collectors/zabbix_collector.py
"""
import argparse
import logging
import os
import sys
from pyzabbix import ZabbixAPI
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
)
log = logging.getLogger("zabbix-collector")
# ---------------------------------------------------------------------------
# Zabbix → NetBox mappings
# ---------------------------------------------------------------------------
ZABBIX_STATUS_MAP = {
0: "active", # Monitored
1: "offline", # Unmonitored
}
ZABBIX_IFACE_TYPE = {
1: "agent", # Zabbix agent
2: "snmp", # SNMP
3: "ipmi", # IPMI
4: "jmx", # JMX
}
# Best-effort OS → Platform mapping from Zabbix template groups
OS_KEYWORDS_TO_PLATFORM = {
"linux": "Linux",
"windows": "Windows",
"cisco": "Cisco IOS",
"juniper": "Juniper Junos",
"freebsd": "FreeBSD",
"vmware": "VMware ESXi",
"proxmox": "Proxmox VE",
"ubuntu": "Ubuntu",
"debian": "Debian",
"centos": "CentOS",
"rhel": "Red Hat Enterprise Linux",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"url": os.environ.get("ZABBIX_URL", ""),
"user": os.environ.get("ZABBIX_USER", "Admin"),
"password": os.environ.get("ZABBIX_PASSWORD", ""),
"api_token": os.environ.get("ZABBIX_API_TOKEN", ""),
"site": os.environ.get("ZABBIX_SITE", "main"),
"default_role": os.environ.get("ZABBIX_DEFAULT_ROLE", "Server"),
"group_to_role": {}, # Could be loaded from config
}
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, manufacturer: str, role: str,
site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Data collection
# ---------------------------------------------------------------------------
def connect_zabbix(cfg: dict) -> ZabbixAPI:
url = cfg["url"]
if not url:
log.error("ZABBIX_URL not set")
sys.exit(1)
zapi = ZabbixAPI(url)
zapi.session.verify = False
if cfg.get("api_token"):
zapi.login(api_token=cfg["api_token"])
else:
zapi.login(cfg["user"], cfg["password"])
log.info("Connected to Zabbix %s", zapi.api_version())
return zapi
def collect_hosts(zapi: ZabbixAPI) -> list[dict]:
"""Fetch all hosts with their interfaces and inventory data."""
hosts = zapi.host.get(
output=["hostid", "host", "name", "status", "description"],
selectInterfaces=["interfaceid", "ip", "dns", "port", "type", "main", "useip"],
selectInventory="extend",
selectGroups=["name"],
selectParentTemplates=["name"],
)
log.info("Fetched %d hosts from Zabbix", len(hosts))
return hosts
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def guess_platform(host_data: dict) -> str | None:
"""Try to guess platform from Zabbix templates/groups/inventory."""
# Check inventory OS field
inventory = host_data.get("inventory") or {}
if isinstance(inventory, dict):
os_full = inventory.get("os_full") or inventory.get("os_short") or ""
for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items():
if keyword.lower() in os_full.lower():
return platform
# Check template names
templates = host_data.get("parentTemplates") or []
for tmpl in templates:
tmpl_name = (tmpl.get("name") or "").lower()
for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items():
if keyword in tmpl_name:
return platform
return None
def guess_role(host_data: dict, default_role: str) -> str:
"""Try to guess device role from Zabbix groups."""
groups = host_data.get("groups") or []
for group in groups:
gname = (group.get("name") or "").lower()
if "router" in gname:
return "Router"
if "switch" in gname:
return "Switch"
if "firewall" in gname:
return "Firewall"
if "hypervisor" in gname:
return "Hypervisor"
if "server" in gname or "linux" in gname or "windows" in gname:
return "Server"
return default_role
def build_host_entities(host_data: dict, cfg: dict) -> list[Entity]:
"""Build Device + Interface + IPAddress entities from a Zabbix host."""
entities = []
site_name = cfg["site"]
default_role = cfg["default_role"]
hostname = host_data.get("name") or host_data.get("host", "unknown")
status = ZABBIX_STATUS_MAP.get(int(host_data.get("status", 0)), "active")
host_id = host_data.get("hostid", "")
# Inventory data
inventory = host_data.get("inventory") or {}
if isinstance(inventory, list):
inventory = {}
serial = inventory.get("serialno_a") or ""
model = inventory.get("model") or inventory.get("hardware") or "Unknown"
vendor = inventory.get("vendor") or inventory.get("hardware_full") or "Unknown"
asset_tag = inventory.get("asset_tag") or ""
role = guess_role(host_data, default_role)
platform = guess_platform(host_data)
# Custom fields for cross-referencing
custom_fields = {}
if host_id:
custom_fields["zabbix_host_id"] = CustomFieldValue(text=str(host_id))
# Device entity
device_kwargs = dict(
name=hostname,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=vendor),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
status=status,
serial=serial[:50] if serial else "",
asset_tag=asset_tag[:50] if asset_tag else "",
tags=["zabbix"],
)
if platform:
device_kwargs["platform"] = Platform(name=platform)
if custom_fields:
device_kwargs["custom_fields"] = custom_fields
entities.append(Entity(device=Device(**device_kwargs)))
# Interface + IP entities from Zabbix interfaces
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
primary_ip = None
for iface in host_data.get("interfaces", []):
iface_type_num = int(iface.get("type", 1))
iface_type_name = ZABBIX_IFACE_TYPE.get(iface_type_num, "agent")
ip = iface.get("ip", "")
is_main = str(iface.get("main", "0")) == "1"
if not ip or ip == "0.0.0.0":
continue
# Interface name based on type
iface_name = f"zabbix-{iface_type_name}"
if is_main and iface_type_name == "agent":
iface_name = "mgmt0"
elif iface_type_name == "snmp":
iface_name = "snmp0"
elif iface_type_name == "ipmi":
iface_name = "ipmi0"
entities.append(Entity(interface=Interface(
device=dev_ref,
name=iface_name,
type="other",
enabled=True,
tags=["zabbix"],
)))
# IP address
ip_str = f"{ip}/32" # Zabbix doesn't provide prefix length
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name=iface_name,
type="other",
),
tags=["zabbix"],
)))
if is_main and primary_ip is None:
primary_ip = ip_str
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
zapi = connect_zabbix(cfg)
hosts = collect_hosts(zapi)
entities: list[Entity] = []
for host in hosts:
try:
entities.extend(build_host_entities(host, cfg))
except Exception as exc:
hostname = host.get("name") or host.get("host", "?")
log.error("Failed to process Zabbix host %s: %s", hostname, exc)
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
from netboxlabs.diode.sdk.ingester import create_message_chunks
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="zabbix-collector",
app_version="0.1.0",
) as client:
chunks = create_message_chunks(entities)
for idx, chunk in enumerate(chunks):
resp = client.ingest(entities=chunk)
if resp.errors:
log.error("Chunk %d errors: %s", idx, resp.errors)
else:
log.info("Chunk %d: %d entities ingested", idx, len(chunk))
def main():
parser = argparse.ArgumentParser(description="Zabbix collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()