Compare commits

...

8 Commits

Author SHA1 Message Date
sam
d70cd8975c Add IOS-XR support to network collector for CML devices
Adds Netmiko-based collection for Cisco IOS-XR devices (CML lab routers)
that lack the XML agent required by NAPALM's iosxr driver. Includes
dedicated parsers for IOS-XR show version, show interfaces, show ipv4/ipv6
interface brief, show inventory, and show running-config. Collects
Bundle-Ether (LAG), Loopback, and physical interfaces with IPs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 17:31:19 -07:00
sam
a6180196e9 Fix MTU=0 validation and create_message_chunks import errors
- Send mtu=None instead of mtu=0 (NetBox requires MTU >= 1)
- Remove create_message_chunks usage (not in installed SDK version)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:58:18 -07:00
sam
37b92c166a Fix create_message_chunks import error across all collectors
The installed Diode SDK version does not export create_message_chunks.
Replace chunked ingestion with direct client.ingest() calls.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:49:25 -07:00
sam
427dbf456d Fix network collector float-to-int cast for interface speed and MTU
NAPALM can return speed/mtu as floats, but the Diode SDK expects integers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:30:36 -07:00
sam
40d3814316 Fix UniFi collector Prefix site parameter for Diode SDK
Prefix uses scope_site instead of site in the Diode SDK ingester.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:26:58 -07:00
sam
5748bad765 Add PBS collector, multi-host PVE support, and collector fixes
- proxmox_collector: support numbered PVE_HOST_1/2/3 env vars with
  backward compat for legacy single PVE_HOST; fix MTU string-to-int cast
- pbs_collector: new collector for Proxmox Backup Server — discovers
  devices, interfaces, IPs, and datastores (as Services) via PBS API
- vmware_collector: fix mac_address → primary_mac_address for Diode SDK
- network_collector: add Netmiko SSH fallback for Brocade/NOS devices,
  add Brocade ICX interface type patterns
- unifi_collector: new collector for UniFi UDM-SE/switches/APs
- ENV_REFERENCE.md: document all collector env vars and setup steps
- .gitignore: exclude collectors/inventory.yaml (contains credentials)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:10:12 -07:00
sam
b4fcdfa277 Add network, CML, Zabbix, Observium, VMware, and Docker collectors
Six new collectors for ingesting infrastructure data into NetBox via
the Diode SDK pipeline:

- network_collector: Cisco/Brocade devices via NAPALM + pyATS/Genie
  with LLDP/CDP cable discovery, VLANs, VRFs, prefixes, device configs,
  inventory items, and BGP push to netbox-bgp plugin API
- cml_collector: Cisco Modeling Labs topology sync (nodes, links, configs)
- zabbix_collector: Brownfield import from Zabbix API with cross-ref
  custom fields
- observium_collector: Device/port/IP import from Observium REST API
- vmware_collector: vCenter/ESXi hosts, VMs, interfaces, disks, IPs
- docker_collector: Container discovery via Docker API (tested: 21
  containers found on local host)

Also adds inventory.yaml.example template for network device credentials.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 03:17:40 -07:00
sam
a5b37c0dd5 Add Proxmox VE collector for NetBox ingestion via Diode SDK
Single-file collector that discovers PVE host infrastructure (nodes,
LXC containers, QEMU VMs, interfaces, IPs, disks) and ingests it
into NetBox through the Diode pipeline. Supports DHCP IP discovery
via PVE runtime interfaces API and two-pass convergence for
primary_ip4 assignment.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 02:37:02 -07:00
13 changed files with 6381 additions and 0 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ __pycache__/
# Generated by setup.sh (contain secrets)
oauth2/client/client-credentials.json
orb-agent/agent.yaml
collectors/inventory.yaml

245
collectors/ENV_REFERENCE.md Normal file
View File

@ -0,0 +1,245 @@
# Environment Variables Reference
All variables go in `.env` at the project root. Variables marked **[HAVE]**
are already configured. Variables marked **[NEED]** are what you need to gather.
---
## Diode Pipeline [HAVE]
Already configured and working.
```bash
INGESTER_CLIENT_ID=diode-ingester
INGESTER_CLIENT_SECRET=<already set>
NETBOX_API_URL=http://172.19.77.160:8000
NETBOX_API_TOKEN=nbt_<already set>
```
## Proxmox VE Collector [HAVE — partial]
Supports multiple standalone PVE hosts. prox940 already configured.
```bash
# Legacy single-host (still works)
PVE_HOST=192.168.1.190
PVE_USER=root@pam
PVE_TOKEN_NAME=diode
PVE_TOKEN_VALUE=<already set>
PVE_VERIFY_SSL=false
# Additional PVE hosts (numbered)
PVE_HOST_2=10.40.40.107 # proxmox2
PVE_USER_2=diode@pve
PVE_TOKEN_NAME_2=diode
PVE_TOKEN_VALUE_2= # NEED — create token on proxmox2
PVE_HOST_3=10.40.40.110 # proxmox3
PVE_USER_3=diode@pve
PVE_TOKEN_NAME_3=diode
PVE_TOKEN_VALUE_3= # NEED — create token on proxmox3
```
**Setup on each PVE host:**
```bash
pveum user add diode@pve --comment "Diode NetBox collector"
pveum aclmod / -user diode@pve -role PVEAuditor
pveum user token add diode@pve diode --privsep 0 --comment "NetBox Diode"
```
## Proxmox Backup Server Collector [NEED]
```bash
PBS_HOST_1=10.40.40.150 # PBS-01
PBS_USER_1=diode@pbs
PBS_TOKEN_NAME_1=diode
PBS_TOKEN_VALUE_1= # NEED — create token on PBS-01
PBS_HOST_2=192.168.1.241 # PBS-02
PBS_USER_2=diode@pbs
PBS_TOKEN_NAME_2=diode
PBS_TOKEN_VALUE_2= # NEED — create token on PBS-02
PBS_HOST_3=pbs.apodacalabs.com # PBS
PBS_USER_3=diode@pbs
PBS_TOKEN_NAME_3=diode
PBS_TOKEN_VALUE_3= # NEED — create token on PBS
```
**Setup on each PBS host:**
```bash
proxmox-backup-manager user create diode@pbs --comment "Diode NetBox collector"
proxmox-backup-manager acl update / Audit --auth-id diode@pbs
proxmox-backup-manager user generate-token diode@pbs diode
```
---
## Network Collector [NEED]
Credentials go in `collectors/inventory.yaml`, not `.env`.
Only these optional vars go in `.env`:
```bash
# Optional: skip pyATS even if installed (run with --no-pyats flag instead)
# No env vars strictly required — everything is in inventory.yaml
```
## CML Topology Collector [NEED]
```bash
CML_HOST= # CML controller IP or hostname (e.g., 10.40.40.50)
CML_USER=admin # CML admin username
CML_PASSWORD= # CML admin password
CML_LAB= # Optional: specific lab name/ID (blank = all labs)
CML_VERIFY_SSL=false # Set true if CML has valid TLS cert
CML_SITE=CML # NetBox site name for CML devices (default: CML)
```
**Setup on CML side:** Just need the controller address and admin creds.
The virl2_client library handles the REST API.
## Zabbix Collector [NEED]
```bash
ZABBIX_URL= # Full URL to API (e.g., http://10.40.40.20/api_jsonrpc.php)
ZABBIX_USER=Admin # Zabbix username
ZABBIX_PASSWORD= # Zabbix password
ZABBIX_API_TOKEN= # OR use an API token instead of user/pass (Zabbix 5.4+)
ZABBIX_SITE=main # NetBox site to assign devices to
ZABBIX_DEFAULT_ROLE=Server # Default role if group-based detection fails
```
**Setup on Zabbix side:** No setup needed — just need read access creds.
If using API token (Zabbix 5.4+): Administration → API tokens → Create.
## Observium Collector [NEED]
```bash
OBSERVIUM_URL= # API base URL (e.g., http://10.40.40.30/api/v0)
OBSERVIUM_USER=admin # Observium username
OBSERVIUM_PASSWORD= # Observium password
OBSERVIUM_SITE=main # NetBox site to assign devices to
OBSERVIUM_DEFAULT_ROLE=Network Device
```
**NOTE:** Observium REST API requires Professional or Enterprise edition.
Community Edition does not expose a REST API. If you're on Community,
skip this collector (Zabbix can cover similar ground).
## VMware Collector [NEED]
```bash
VCENTER_HOST= # vCenter or ESXi IP/hostname
VCENTER_USER=administrator@vsphere.local
VCENTER_PASSWORD= # vCenter/ESXi password
VCENTER_PORT=443 # API port (default: 443)
VCENTER_VERIFY_SSL=false # Set true if valid TLS cert
VCENTER_SITE=main # NetBox site to assign devices to
```
**Setup on vCenter side:** Just need a read-only account.
Minimum role: Read-only → Assign at vCenter root.
## Docker Collector [NEED — only if remote hosts]
Works immediately for local Docker (no env vars needed).
For remote Docker hosts:
```bash
DOCKER_HOSTS= # Comma-separated (e.g., tcp://10.0.0.5:2375,tcp://10.0.0.6:2375)
DOCKER_SITE=main # NetBox site
DOCKER_TLS_VERIFY=false # Set true if Docker TLS is configured
```
**Setup on remote Docker hosts:** Enable TCP API:
`dockerd -H unix:///var/run/docker.sock -H tcp://0.0.0.0:2375`
Or use TLS: https://docs.docker.com/engine/security/protect-access/
## UniFi Collector [NEED]
Discovers UDM-SE, switches, and APs from the local UniFi Controller API.
```bash
UNIFI_HOST=192.168.1.1 # UDM-SE / Controller IP (or hostname)
UNIFI_USER= # UniFi local admin username
UNIFI_PASSWORD= # UniFi local admin password
UNIFI_SITE=default # UniFi site name (usually "default")
UNIFI_VERIFY_SSL=false # UDM-SE uses self-signed cert by default
UNIFI_IS_UDM=true # true for UDM/UDM-SE/UDR, false for legacy controller
UNIFI_NETBOX_SITE=main # NetBox site to assign devices to
```
**What it discovers:**
- UDM-SE, switches, APs as Devices with model/serial/firmware
- Switch ports with speed, PoE, SFP detection
- WiFi radios with band/channel/power
- VLANs and subnets from network configurations
- WLANs (SSIDs) with auth type
- LLDP neighbors → Cables for topology mapping
**Setup on UDM-SE:** Just need a local admin account.
The API is built-in — no additional setup required.
## NAPALM Plugin (live device status in NetBox UI) [NEED]
These go in the NetBox Docker env, not the project `.env`.
Add to `/home/user/netbox-docker/env/netbox.env`:
```bash
NAPALM_USERNAME=admin # Same SSH creds as your network devices
NAPALM_PASSWORD= # SSH password
```
Then assign NAPALM drivers to Platforms in NetBox:
Devices → Platforms → edit each platform → set NAPALM driver
(e.g., Platform "Cisco IOS" → NAPALM driver: ios)
---
## Quick Checklist
| Collector | What to gather | Priority |
|-----------|---------------|----------|
| Network | SSH creds for routers/switches, fill in inventory.yaml | HIGH |
| CML | Controller IP + admin creds | HIGH |
| Zabbix | API URL + creds or API token | MEDIUM |
| Observium | API URL + creds (needs paid edition) | LOW |
| VMware | vCenter IP + read-only account | MEDIUM |
| Docker | Nothing (local works), or remote TCP URLs | LOW |
| Proxmox VE | Token per host (diode@pve + API token) | HIGH |
| PBS | Token per host (diode@pbs + API token) | HIGH |
| UniFi | UDM-SE IP + local admin creds | HIGH |
| NAPALM | SSH creds in netbox.env + assign drivers to platforms | MEDIUM |
## Testing Each Collector
All collectors support `--dry-run` for safe testing:
```bash
# Activate venv first
source .venv/bin/activate
# Network devices (highest value — discovers cables + topology)
python collectors/network_collector.py -i collectors/inventory.yaml --dry-run
# CML topology
python collectors/cml_collector.py --dry-run
# Zabbix import
python collectors/zabbix_collector.py --dry-run
# Observium import
python collectors/observium_collector.py --dry-run
# VMware
python collectors/vmware_collector.py --dry-run
# Docker (works immediately)
python collectors/docker_collector.py --dry-run
# UniFi (UDM-SE + APs)
python collectors/unifi_collector.py --dry-run
# Proxmox VE (multi-host — already tested)
python collectors/proxmox_collector.py --dry-run
# Proxmox Backup Server
python collectors/pbs_collector.py --dry-run
```
Add `--log-level DEBUG` to any command for verbose output.

0
collectors/__init__.py Normal file
View File

456
collectors/cml_collector.py Normal file
View File

@ -0,0 +1,456 @@
#!/usr/bin/env python3
"""Cisco Modeling Labs collector for NetBox via Diode SDK.
Syncs CML lab topology into NetBox: devices (nodes), interfaces, cables (links),
L3 addresses, and device configs.
Usage:
python collectors/cml_collector.py --dry-run
python collectors/cml_collector.py
"""
import argparse
import logging
import os
import re
import sys
from virl2_client import ClientLibrary
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cable,
Device,
DeviceConfig,
DeviceRole,
DeviceType,
Entity,
GenericObject,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
)
log = logging.getLogger("cml-collector")
# ---------------------------------------------------------------------------
# CML node definition → NetBox mappings
# ---------------------------------------------------------------------------
NODE_DEF_TO_PLATFORM = {
"iosv": "Cisco IOS",
"iosvl2": "Cisco IOS",
"iosxrv": "Cisco IOS-XR",
"iosxrv9000": "Cisco IOS-XR",
"csr1000v": "Cisco IOS-XE",
"cat8000v": "Cisco IOS-XE",
"cat9000v": "Cisco IOS-XE",
"nxosv": "Cisco NX-OS",
"nxosv9000": "Cisco NX-OS",
"asav": "Cisco ASA",
"server": "Linux",
"ubuntu": "Ubuntu",
"alpine": "Alpine Linux",
"desktop": "Linux",
"trex": "TRex Traffic Generator",
"wan_emulator": "Linux",
"external_connector": "External Connector",
"unmanaged_switch": "Unmanaged Switch",
}
NODE_DEF_TO_MANUFACTURER = {
"iosv": "Cisco",
"iosvl2": "Cisco",
"iosxrv": "Cisco",
"iosxrv9000": "Cisco",
"csr1000v": "Cisco",
"cat8000v": "Cisco",
"cat9000v": "Cisco",
"nxosv": "Cisco",
"nxosv9000": "Cisco",
"asav": "Cisco",
}
NODE_DEF_TO_ROLE = {
"iosv": "Router",
"iosvl2": "Switch",
"iosxrv": "Router",
"iosxrv9000": "Router",
"csr1000v": "Router",
"cat8000v": "Router",
"cat9000v": "Switch",
"nxosv": "Switch",
"nxosv9000": "Switch",
"asav": "Firewall",
"server": "Server",
"ubuntu": "Server",
"alpine": "Server",
"desktop": "Server",
"external_connector": "Patch Panel",
"unmanaged_switch": "Switch",
}
NODE_DEF_TO_MODEL = {
"iosv": "IOSv",
"iosvl2": "IOSvL2",
"iosxrv": "IOS-XRv",
"iosxrv9000": "IOS-XRv 9000",
"csr1000v": "CSR1000v",
"cat8000v": "Catalyst 8000V",
"cat9000v": "Catalyst 9000V",
"nxosv": "NX-OSv",
"nxosv9000": "NX-OSv 9000",
"asav": "ASAv",
}
CML_STATE_TO_STATUS = {
"BOOTED": "active",
"STARTED": "active",
"STOPPED": "offline",
"DEFINED_ON_CORE": "planned",
"QUEUED": "planned",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"host": os.environ.get("CML_HOST", ""),
"user": os.environ.get("CML_USER", "admin"),
"password": os.environ.get("CML_PASSWORD", ""),
"lab": os.environ.get("CML_LAB", ""),
"verify_ssl": os.environ.get("CML_VERIFY_SSL", "false").lower() == "true",
"site": os.environ.get("CML_SITE", "CML"),
}
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, node_def: str, site_name: str) -> Device:
model = NODE_DEF_TO_MODEL.get(node_def, node_def)
manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic")
role = NODE_DEF_TO_ROLE.get(node_def, "Network Device")
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_node_entity(node, site_name: str) -> Entity:
node_def = node.node_definition or "unknown"
model = NODE_DEF_TO_MODEL.get(node_def, node_def)
manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic")
role = NODE_DEF_TO_ROLE.get(node_def, "Network Device")
platform = NODE_DEF_TO_PLATFORM.get(node_def, node_def)
status = CML_STATE_TO_STATUS.get(node.state, "planned")
return Entity(device=Device(
name=node.label,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
platform=Platform(name=platform),
site=Site(name=site_name),
status=status,
tags=["cml"],
))
def build_interface_entities(node, site_name: str) -> list[Entity]:
entities = []
node_def = node.node_definition or "unknown"
for iface in node.interfaces():
iface_name = iface.label or f"iface{iface.slot}"
# Map interface type from name
iface_type = "virtual"
if re.match(r"^(Gi|GigabitEthernet)", iface_name, re.IGNORECASE):
iface_type = "1000base-t"
elif re.match(r"^(Te|TenGig)", iface_name, re.IGNORECASE):
iface_type = "10gbase-x-sfpp"
elif re.match(r"^(Fa|FastEthernet)", iface_name, re.IGNORECASE):
iface_type = "100base-tx"
elif re.match(r"^(Lo|Loopback)", iface_name, re.IGNORECASE):
iface_type = "virtual"
elif re.match(r"^(eth|ens|enp)", iface_name, re.IGNORECASE):
iface_type = "1000base-t"
entities.append(Entity(interface=Interface(
device=_device_ref(node.label, node_def, site_name),
name=iface_name,
type=iface_type,
enabled=node.state in ("BOOTED", "STARTED"),
tags=["cml"],
)))
return entities
def build_ip_entities(node, site_name: str) -> list[Entity]:
"""Build IP entities from CML node L3 addresses."""
entities = []
node_def = node.node_definition or "unknown"
for iface in node.interfaces():
iface_name = iface.label or f"iface{iface.slot}"
# CML may expose discovered L3 addresses
try:
if hasattr(iface, "discovered_ipv4") and iface.discovered_ipv4:
for addr in iface.discovered_ipv4:
if addr and not addr.startswith("127."):
ip_str = addr if "/" in addr else f"{addr}/24"
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=_device_ref(node.label, node_def, site_name),
name=iface_name,
type="virtual",
),
tags=["cml"],
)))
if hasattr(iface, "discovered_ipv6") and iface.discovered_ipv6:
for addr in iface.discovered_ipv6:
if addr and not addr.lower().startswith("fe80"):
ip_str = addr if "/" in addr else f"{addr}/64"
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=_device_ref(node.label, node_def, site_name),
name=iface_name,
type="virtual",
),
tags=["cml"],
)))
except Exception as exc:
log.debug(" IP discovery unavailable for %s:%s: %s",
node.label, iface_name, exc)
return entities
def build_cable_entities(lab, site_name: str,
node_map: dict[str, str]) -> list[Entity]:
"""Build Cable entities from CML links.
node_map: {node_id: (node_label, node_definition)}
"""
entities = []
for link in lab.links():
try:
iface_a = link.interface_a
iface_b = link.interface_b
node_a = iface_a.node
node_b = iface_b.node
a_name = iface_a.label or f"iface{iface_a.slot}"
b_name = iface_b.label or f"iface{iface_b.slot}"
a_node_def = node_a.node_definition or "unknown"
b_node_def = node_b.node_definition or "unknown"
cable = Cable(
a_terminations=[GenericObject(object_interface=Interface(
device=_device_ref(node_a.label, a_node_def, site_name),
name=a_name,
type="virtual",
))],
b_terminations=[GenericObject(object_interface=Interface(
device=_device_ref(node_b.label, b_node_def, site_name),
name=b_name,
type="virtual",
))],
status="connected",
tags=["cml"],
)
entities.append(Entity(cable=cable))
log.info(" Cable: %s:%s <-> %s:%s",
node_a.label, a_name, node_b.label, b_name)
except Exception as exc:
log.warning(" Failed to build cable for link: %s", exc)
return entities
def build_config_entity(node, site_name: str) -> Entity | None:
"""Build DeviceConfig entity from CML node configuration."""
node_def = node.node_definition or "unknown"
try:
config = node.config
if not config:
return None
return Entity(
device=_device_ref(node.label, node_def, site_name),
device_config=DeviceConfig(
startup=config.encode("utf-8") if config else None,
),
)
except Exception as exc:
log.debug(" Config unavailable for %s: %s", node.label, exc)
return None
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
host = cfg["host"]
if not host:
log.error("CML_HOST not set")
sys.exit(1)
url = f"https://{host}" if not host.startswith("http") else host
log.info("Connecting to CML at %s...", url)
client = ClientLibrary(url, cfg["user"], cfg["password"],
ssl_verify=cfg["verify_ssl"])
site_name = cfg["site"]
entities: list[Entity] = []
# Get labs
labs = client.all_labs()
target_lab = cfg.get("lab")
if target_lab:
labs = [l for l in labs if l.title == target_lab or l.id == target_lab]
if not labs:
log.error("Lab '%s' not found", target_lab)
sys.exit(1)
for lab in labs:
lab.sync()
log.info("Lab: %s (%s) — %d nodes, %d links",
lab.title, lab.state(), len(lab.nodes()), len(lab.links()))
node_map = {}
for node in lab.nodes():
node_def = node.node_definition or "unknown"
node_map[node.id] = (node.label, node_def)
# Skip external connectors and unmanaged switches for device creation
if node_def in ("external_connector", "unmanaged_switch"):
log.debug(" Skipping non-device node: %s (%s)", node.label, node_def)
# Still create interface entities for cable termination
entities.extend(build_interface_entities(node, site_name))
continue
# Device
entities.append(build_node_entity(node, site_name))
# Interfaces
entities.extend(build_interface_entities(node, site_name))
# IPs
entities.extend(build_ip_entities(node, site_name))
# Config
config_entity = build_config_entity(node, site_name)
if config_entity:
entities.append(config_entity)
# Cables from links
entities.extend(build_cable_entities(lab, site_name, node_map))
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="cml-collector",
app_version="0.1.0",
) as client:
resp = client.ingest(entities=entities)
if resp.errors:
log.error("Ingestion errors: %s", resp.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="CML topology collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,358 @@
#!/usr/bin/env python3
"""Docker container collector for NetBox via Diode SDK.
Discovers Docker containers across one or more hosts and ingests them
into NetBox as VirtualMachines with VMInterfaces and IPAddresses.
Usage:
python collectors/docker_collector.py --dry-run
python collectors/docker_collector.py
"""
import argparse
import logging
import os
import sys
import docker
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cluster,
ClusterType,
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
IPAddress,
Manufacturer,
Platform,
Site,
VirtualMachine,
VMInterface,
)
log = logging.getLogger("docker-collector")
# ---------------------------------------------------------------------------
# Status mappings
# ---------------------------------------------------------------------------
CONTAINER_STATUS_MAP = {
"running": "active",
"paused": "active",
"restarting": "active",
"created": "planned",
"exited": "offline",
"dead": "failed",
"removing": "decommissioning",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
# Support multiple Docker hosts (comma-separated)
hosts_str = os.environ.get("DOCKER_HOSTS", "")
if hosts_str:
hosts = [h.strip() for h in hosts_str.split(",") if h.strip()]
else:
hosts = ["local"] # Use local Docker socket
return {
"hosts": hosts,
"site": os.environ.get("DOCKER_SITE", "main"),
"tls_verify": os.environ.get("DOCKER_TLS_VERIFY", "false").lower() == "true",
}
# ---------------------------------------------------------------------------
# VM reference helper
# ---------------------------------------------------------------------------
def _vm_ref(name: str, cluster_name: str, site_name: str) -> VirtualMachine:
return VirtualMachine(
name=name,
site=Site(name=site_name),
cluster=Cluster(
name=cluster_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Docker Container"),
)
# ---------------------------------------------------------------------------
# Data collection
# ---------------------------------------------------------------------------
def connect_docker(host: str, tls_verify: bool = False) -> docker.DockerClient:
"""Connect to a Docker host."""
if host == "local":
return docker.from_env()
else:
tls_config = None
if host.startswith("https://") and tls_verify:
tls_config = docker.tls.TLSConfig(verify=True)
return docker.DockerClient(base_url=host, tls=tls_config)
def get_host_info(client: docker.DockerClient) -> dict:
"""Get Docker host system info."""
return client.info()
def get_containers(client: docker.DockerClient, all_containers: bool = True) -> list:
"""Get all containers from Docker host."""
return client.containers.list(all=all_containers)
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_cluster_entity(host_name: str, site_name: str) -> Entity:
"""Build a Cluster entity for the Docker host."""
return Entity(cluster=Cluster(
name=host_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
status="active",
tags=["docker"],
))
def build_container_entities(container, host_name: str,
site_name: str) -> list[Entity]:
"""Build VirtualMachine + VMInterface + IPAddress entities for a container."""
entities = []
# Container info
name = container.name
status = CONTAINER_STATUS_MAP.get(container.status, "active")
image = container.image.tags[0] if container.image.tags else str(container.image.id)[:20]
short_id = container.short_id
# Labels/env for metadata
labels = container.labels or {}
compose_project = labels.get("com.docker.compose.project", "")
compose_service = labels.get("com.docker.compose.service", "")
# Custom fields
custom_fields = {
"docker_container_id": CustomFieldValue(text=short_id),
}
if compose_project:
custom_fields["docker_compose_project"] = CustomFieldValue(text=compose_project)
# VirtualMachine entity
vm_kwargs = dict(
name=name,
status=status,
site=Site(name=site_name),
cluster=Cluster(
name=host_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Docker Container"),
platform=Platform(name="Docker"),
comments=f"Image: {image}",
tags=["docker"],
custom_fields=custom_fields,
)
entities.append(Entity(virtual_machine=VirtualMachine(**vm_kwargs)))
# Network interfaces and IPs
vm_ref = _vm_ref(name, host_name, site_name)
try:
# container.attrs has full inspect data
networks = container.attrs.get("NetworkSettings", {}).get("Networks", {})
for net_name, net_data in networks.items():
ip = net_data.get("IPAddress", "")
mac = net_data.get("MacAddress", "")
gateway = net_data.get("Gateway", "")
prefix_len = net_data.get("IPPrefixLen", 0)
ipv6 = net_data.get("GlobalIPv6Address", "")
ipv6_prefix = net_data.get("GlobalIPv6PrefixLen", 0)
# VMInterface
iface_name = net_name[:64]
iface_kwargs = dict(
virtual_machine=vm_ref,
name=iface_name,
enabled=True,
tags=["docker"],
)
if mac:
iface_kwargs["mac_address"] = mac
entities.append(Entity(vm_interface=VMInterface(**iface_kwargs)))
# IPv4 address
if ip and ip != "0.0.0.0":
ip_str = f"{ip}/{prefix_len}" if prefix_len else f"{ip}/24"
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=iface_name,
),
tags=["docker"],
)))
# IPv6 address
if ipv6:
ipv6_str = f"{ipv6}/{ipv6_prefix}" if ipv6_prefix else f"{ipv6}/64"
entities.append(Entity(ip_address=IPAddress(
address=ipv6_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=iface_name,
),
tags=["docker"],
)))
except Exception as exc:
log.warning(" Network info unavailable for %s: %s", name, exc)
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
site_name = cfg["site"]
entities: list[Entity] = []
for host in cfg["hosts"]:
log.info("Connecting to Docker host: %s", host)
try:
client = connect_docker(host, cfg["tls_verify"])
except Exception as exc:
log.error("Failed to connect to Docker host %s: %s", host, exc)
continue
# Get host info for cluster name
try:
info = get_host_info(client)
host_name = info.get("Name", host)
log.info(" Host: %s (Docker %s, %d containers)",
host_name, info.get("ServerVersion", "?"),
info.get("Containers", 0))
except Exception as exc:
log.warning(" Cannot get host info: %s", exc)
host_name = host
# Cluster entity
entities.append(build_cluster_entity(host_name, site_name))
# Container entities
try:
containers = get_containers(client)
log.info(" Found %d containers", len(containers))
for container in containers:
try:
entities.extend(build_container_entities(
container, host_name, site_name
))
except Exception as exc:
log.error(" Failed to process container %s: %s",
container.name, exc)
except Exception as exc:
log.error(" Failed to list containers on %s: %s", host_name, exc)
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="docker-collector",
app_version="0.1.0",
) as client:
resp = client.ingest(entities=entities)
if resp.errors:
log.error("Ingestion errors: %s", resp.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="Docker container collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--all", action="store_true",
help="Include stopped containers (default: running only)")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,36 @@
# Network Device Inventory for network_collector.py
#
# Copy this file to inventory.yaml and fill in your device details.
# Fields in 'defaults' apply to all devices unless overridden per-device.
defaults:
site: main
role: Network Device
username: admin
password: cisco
secret: cisco # enable secret (if required)
driver: ios # NAPALM driver: ios, iosxr, eos, junos, nxos
timeout: 60
devices:
# Cisco IOS routers
- host: 10.10.20.1
driver: ios
role: Router
# Cisco IOS switches
- host: 10.10.20.55
driver: ios
role: Switch
# Cisco IOS-XR devices
# - host: 10.10.20.100
# driver: iosxr
# role: Router
# Brocade switches (requires napalm-ruckus-fastiron or use netmiko fallback)
# - host: 10.10.20.200
# driver: ros # or use custom driver name
# role: Switch
# optional_args:
# transport: ssh

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,378 @@
#!/usr/bin/env python3
"""Observium collector for NetBox via Diode SDK.
Pulls device, port, and IP data from the Observium REST API for brownfield
import into NetBox. Adds Observium device IDs as custom fields for cross-referencing.
Note: Observium API requires Professional or Enterprise edition.
Community Edition users can skip this collector.
Usage:
python collectors/observium_collector.py --dry-run
python collectors/observium_collector.py
"""
import argparse
import logging
import os
import sys
import requests
from requests.auth import HTTPBasicAuth
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
VLAN,
)
log = logging.getLogger("observium-collector")
# ---------------------------------------------------------------------------
# Observium → NetBox mappings
# ---------------------------------------------------------------------------
OBSERVIUM_STATUS_MAP = {
"1": "active", # up
"0": "offline", # down
}
IF_TYPE_MAP = {
"ethernetCsmacd": "1000base-t",
"gigabitEthernet": "1000base-t",
"fastEthernet": "100base-tx",
"propVirtual": "virtual",
"l3ipvlan": "virtual",
"tunnel": "virtual",
"softwareLoopback": "virtual",
"ieee8023adLag": "lag",
"bridge": "bridge",
"other": "other",
}
OS_TO_PLATFORM = {
"ios": "Cisco IOS",
"iosxe": "Cisco IOS-XE",
"iosxr": "Cisco IOS-XR",
"nxos": "Cisco NX-OS",
"junos": "Juniper Junos",
"linux": "Linux",
"vmware": "VMware ESXi",
"ironware": "Brocade IronWare",
"fastiron": "Brocade FastIron",
"windows": "Windows",
"freebsd": "FreeBSD",
"proxmox": "Proxmox VE",
}
OS_TO_MANUFACTURER = {
"ios": "Cisco",
"iosxe": "Cisco",
"iosxr": "Cisco",
"nxos": "Cisco",
"junos": "Juniper",
"ironware": "Brocade",
"fastiron": "Brocade",
}
OS_TO_ROLE = {
"ios": "Router",
"iosxe": "Router",
"iosxr": "Router",
"nxos": "Switch",
"junos": "Router",
"ironware": "Switch",
"fastiron": "Switch",
"linux": "Server",
"vmware": "Hypervisor",
"windows": "Server",
"proxmox": "Hypervisor",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"url": os.environ.get("OBSERVIUM_URL", ""),
"user": os.environ.get("OBSERVIUM_USER", "admin"),
"password": os.environ.get("OBSERVIUM_PASSWORD", ""),
"site": os.environ.get("OBSERVIUM_SITE", "main"),
"default_role": os.environ.get("OBSERVIUM_DEFAULT_ROLE", "Network Device"),
}
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def api_get(base_url: str, endpoint: str, auth: HTTPBasicAuth,
params: dict | None = None) -> dict:
"""Make a GET request to the Observium API."""
url = f"{base_url}/{endpoint}"
resp = requests.get(url, auth=auth, params=params, timeout=30, verify=False)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, manufacturer: str, role: str,
site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Data collection and entity building
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
base_url = cfg["url"].rstrip("/")
if not base_url:
log.error("OBSERVIUM_URL not set")
sys.exit(1)
auth = HTTPBasicAuth(cfg["user"], cfg["password"])
site_name = cfg["site"]
default_role = cfg["default_role"]
entities: list[Entity] = []
# --- Devices ---
log.info("Fetching devices from Observium...")
try:
devices_resp = api_get(base_url, "devices", auth)
except Exception as exc:
log.error("Failed to fetch devices: %s", exc)
sys.exit(1)
devices = devices_resp.get("devices", {})
if isinstance(devices, list):
devices = {str(i): d for i, d in enumerate(devices)}
log.info("Found %d devices", len(devices))
device_id_to_name = {}
for dev_id, dev_data in devices.items():
hostname = dev_data.get("hostname") or dev_data.get("sysName") or f"device-{dev_id}"
device_id_to_name[dev_id] = hostname
os_type = dev_data.get("os") or ""
model = dev_data.get("hardware") or "Unknown"
vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown")
serial = dev_data.get("serial") or ""
status = "active" if dev_data.get("status") == "1" else "offline"
role = OS_TO_ROLE.get(os_type, default_role)
platform = OS_TO_PLATFORM.get(os_type)
custom_fields = {
"observium_device_id": CustomFieldValue(text=str(dev_id)),
}
device_kwargs = dict(
name=hostname,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=vendor),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
serial=serial[:50] if serial else "",
status=status,
tags=["observium"],
custom_fields=custom_fields,
)
if platform:
device_kwargs["platform"] = Platform(name=platform)
entities.append(Entity(device=Device(**device_kwargs)))
# --- Ports (interfaces) ---
log.info("Fetching ports from Observium...")
try:
ports_resp = api_get(base_url, "ports", auth)
ports = ports_resp.get("ports", {})
if isinstance(ports, list):
ports = {str(i): p for i, p in enumerate(ports)}
log.info("Found %d ports", len(ports))
except Exception as exc:
log.warning("Failed to fetch ports: %s", exc)
ports = {}
port_id_to_info = {}
for port_id, port_data in ports.items():
dev_id = str(port_data.get("device_id", ""))
hostname = device_id_to_name.get(dev_id)
if not hostname:
continue
iface_name = port_data.get("ifName") or port_data.get("port_label") or f"port{port_id}"
if_type = port_data.get("ifType") or "other"
mac = port_data.get("ifPhysAddress") or ""
speed = int(port_data.get("ifSpeed", 0) or 0) // 1000000 # bps → Mbps
mtu = int(port_data.get("ifMtu", 0) or 0)
description = port_data.get("ifAlias") or ""
enabled = port_data.get("ifAdminStatus") == "up"
iface_type = IF_TYPE_MAP.get(if_type, "other")
if iface_type == "other" and speed:
from collectors.network_collector import SPEED_TO_TYPE
iface_type = SPEED_TO_TYPE.get(speed, "other")
# Look up device model for reference
dev_data = devices.get(dev_id, {})
os_type = dev_data.get("os") or ""
model = dev_data.get("hardware") or "Unknown"
vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown")
role = OS_TO_ROLE.get(os_type, default_role)
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
entities.append(Entity(interface=Interface(
device=dev_ref,
name=iface_name[:64],
type=iface_type,
mac_address=mac,
mtu=mtu,
speed=speed * 1000 if speed else 0, # Mbps → Kbps
enabled=enabled,
description=description[:200] if description else "",
tags=["observium"],
)))
port_id_to_info[port_id] = (hostname, iface_name, iface_type, model, vendor, role)
# --- IP addresses ---
log.info("Fetching IP addresses from Observium...")
try:
# Observium may expose IPs through the ports/addresses endpoint
for port_id, (hostname, iface_name, iface_type, model, vendor, role) in port_id_to_info.items():
try:
addr_resp = api_get(base_url, f"ports/{port_id}/ip", auth)
addresses = addr_resp.get("addresses", {})
if isinstance(addresses, list):
addresses = {str(i): a for i, a in enumerate(addresses)}
for addr_id, addr_data in addresses.items():
ip = addr_data.get("ipv4_address") or addr_data.get("ipv6_address") or ""
prefix_len = addr_data.get("ipv4_prefixlen") or addr_data.get("ipv6_prefixlen") or ""
if ip and prefix_len:
ip_str = f"{ip}/{prefix_len}"
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name=iface_name,
type=iface_type,
),
tags=["observium"],
)))
except Exception:
pass # IP endpoint may not be available for all ports
except Exception as exc:
log.warning("IP address collection failed: %s", exc)
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="observium-collector",
app_version="0.1.0",
) as client:
resp = client.ingest(entities=entities)
if resp.errors:
log.error("Ingestion errors: %s", resp.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="Observium collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

464
collectors/pbs_collector.py Normal file
View File

@ -0,0 +1,464 @@
#!/usr/bin/env python3
"""Proxmox Backup Server collector for NetBox via Diode SDK.
Discovers PBS hosts, network interfaces, IPs, and datastores
and ingests them into NetBox through the Diode pipeline.
Usage:
python collectors/pbs_collector.py --dry-run
python collectors/pbs_collector.py
"""
import argparse
import logging
import os
import sys
from proxmoxer import ProxmoxAPI
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Service,
Site,
)
log = logging.getLogger("pbs-collector")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PBS_MANUFACTURER = "Proxmox Server Solutions GmbH"
PBS_MODEL = "Proxmox Backup Server"
IFACE_TYPE_MAP = {
"eth": "1000base-t",
"bond": "lag",
"bridge": "bridge",
"vlan": "virtual",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_diode_config() -> dict:
cfg = {
"diode_target": os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode"),
"client_id": os.getenv("INGESTER_CLIENT_ID", os.getenv("DIODE_CLIENT_ID", "diode-ingester")),
"client_secret": os.getenv("INGESTER_CLIENT_SECRET", os.getenv("DIODE_CLIENT_SECRET")),
}
if not cfg["client_secret"]:
log.error("Missing required env var: INGESTER_CLIENT_SECRET or DIODE_CLIENT_SECRET")
sys.exit(1)
return cfg
def get_pbs_hosts() -> list[dict]:
"""Build list of PBS host configs from numbered env vars.
Supports PBS_HOST_1/PBS_USER_1/... through PBS_HOST_N.
Falls back to legacy single PBS_HOST if no numbered vars exist.
"""
hosts = []
misses = 0
for i in range(1, 100):
host = os.getenv(f"PBS_HOST_{i}")
if host is None:
misses += 1
if misses >= 3:
break
continue
misses = 0
hosts.append({
"pbs_host": host,
"pbs_user": os.getenv(f"PBS_USER_{i}", os.getenv("PBS_USER", "root@pam")),
"pbs_token_name": os.getenv(f"PBS_TOKEN_NAME_{i}", os.getenv("PBS_TOKEN_NAME")),
"pbs_token_value": os.getenv(f"PBS_TOKEN_VALUE_{i}", os.getenv("PBS_TOKEN_VALUE")),
"pbs_verify_ssl": os.getenv(
f"PBS_VERIFY_SSL_{i}", os.getenv("PBS_VERIFY_SSL", "false")
).lower() in ("true", "1", "yes"),
"pbs_port": int(os.getenv(f"PBS_PORT_{i}", os.getenv("PBS_PORT", "8007"))),
"site_name": os.getenv(f"PBS_SITE_{i}", os.getenv("SITE_NAME", "main")),
})
# Backward compat with single PBS_HOST
legacy_host = os.getenv("PBS_HOST")
if legacy_host:
already_listed = any(h["pbs_host"] == legacy_host for h in hosts)
if not already_listed:
hosts.insert(0, {
"pbs_host": legacy_host,
"pbs_user": os.getenv("PBS_USER", "root@pam"),
"pbs_token_name": os.getenv("PBS_TOKEN_NAME"),
"pbs_token_value": os.getenv("PBS_TOKEN_VALUE"),
"pbs_verify_ssl": os.getenv("PBS_VERIFY_SSL", "false").lower() in ("true", "1", "yes"),
"pbs_port": int(os.getenv("PBS_PORT", "8007")),
"site_name": os.getenv("SITE_NAME", "main"),
})
if not hosts:
log.error("No PBS hosts configured (set PBS_HOST or PBS_HOST_1)")
sys.exit(1)
for i, h in enumerate(hosts):
missing = [k for k in ("pbs_host", "pbs_token_name", "pbs_token_value") if not h.get(k)]
if missing:
log.error("PBS host %d (%s): missing %s", i + 1, h.get("pbs_host", "?"), ", ".join(missing))
sys.exit(1)
return hosts
# ---------------------------------------------------------------------------
# PBS connection
# ---------------------------------------------------------------------------
def connect_pbs(config: dict) -> ProxmoxAPI:
"""Create and return a ProxmoxAPI connection to PBS."""
return ProxmoxAPI(
config["pbs_host"],
service="PBS",
port=config["pbs_port"],
user=config["pbs_user"],
token_name=config["pbs_token_name"],
token_value=config["pbs_token_value"],
verify_ssl=config["pbs_verify_ssl"],
backend="https",
)
# ---------------------------------------------------------------------------
# Data collection (pure PBS API calls)
# ---------------------------------------------------------------------------
def collect_node_status(pbs: ProxmoxAPI) -> dict:
return pbs.nodes("localhost").status.get()
def collect_node_networks(pbs: ProxmoxAPI) -> list[dict]:
return pbs.nodes("localhost").network.get()
def collect_datastores(pbs: ProxmoxAPI) -> list[dict]:
return pbs.admin.datastore.get()
def collect_datastore_usage(pbs: ProxmoxAPI) -> list[dict]:
try:
return pbs.status("datastore-usage").get()
except Exception as exc:
log.debug("Datastore usage endpoint unavailable: %s", exc)
return []
def collect_datastore_snapshots(pbs: ProxmoxAPI, store: str) -> list[dict]:
try:
return pbs.admin.datastore(store).snapshots.get()
except Exception as exc:
log.debug("Cannot get snapshots for %s: %s", store, exc)
return []
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _device_ref(hostname: str, site_name: str) -> Device:
return Device(
name=hostname,
device_type=DeviceType(
model=PBS_MODEL,
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
role=DeviceRole(name="Backup Server"),
site=Site(name=site_name),
)
def _map_interface_type(pve_type: str, iface_name: str) -> str:
if pve_type in IFACE_TYPE_MAP:
return IFACE_TYPE_MAP[pve_type]
if iface_name.startswith(("eno", "enp", "ens", "eth")):
return "1000base-t"
if iface_name.startswith("vmbr"):
return "bridge"
return "other"
def _netmask_to_prefix(mask: str) -> int:
try:
return sum(bin(int(o)).count("1") for o in mask.split("."))
except (ValueError, AttributeError):
return 24
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_device_entity(hostname: str, node_status: dict, site_name: str) -> Entity:
cpuinfo = node_status.get("cpuinfo", {})
memory = node_status.get("memory", {})
mem_gb = memory.get("total", 0) / (1024 ** 3)
info = node_status.get("info", {})
pbs_version = info.get("version", "")
kernel = node_status.get("kversion", "")
return Entity(device=Device(
name=hostname,
device_type=DeviceType(
model=PBS_MODEL,
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
role=DeviceRole(name="Backup Server"),
platform=Platform(
name="Proxmox Backup Server",
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
site=Site(name=site_name),
status="active",
description=f"PBS {pbs_version}, kernel {kernel}" if pbs_version else "",
comments=f"CPU: {cpuinfo.get('model', 'N/A')}, "
f"Cores: {cpuinfo.get('cores', '?')}, "
f"Memory: {mem_gb:.1f} GB",
tags=["proxmox", "pbs"],
))
def build_interface_entities(hostname: str, interfaces: list[dict],
site_name: str) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
pve_type = iface.get("type", "")
desc_parts = []
if iface.get("bridge_ports"):
desc_parts.append(f"bridge_ports: {iface['bridge_ports']}")
if iface.get("comments"):
desc_parts.append(iface["comments"])
entities.append(Entity(interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
enabled=bool(iface.get("active", 0)),
mtu=int(iface["mtu"]) if iface.get("mtu") else None,
description=", ".join(desc_parts)[:200] if desc_parts else "",
tags=["proxmox", "pbs"],
)))
return entities
def build_ip_entities(hostname: str, interfaces: list[dict],
site_name: str) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
cidr = iface.get("cidr")
address = iface.get("address")
netmask = iface.get("netmask")
if cidr:
ip_str = cidr
elif address and netmask:
ip_str = f"{address}/{_netmask_to_prefix(netmask)}"
elif address:
ip_str = f"{address}/24"
else:
continue
pve_type = iface.get("type", "")
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
),
tags=["proxmox", "pbs"],
)))
# IPv6
cidr6 = iface.get("cidr6")
if cidr6:
entities.append(Entity(ip_address=IPAddress(
address=cidr6,
status="active",
assigned_object_interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
),
tags=["proxmox", "pbs"],
)))
return entities
def build_datastore_entities(hostname: str, datastores: list[dict],
usage_map: dict, site_name: str,
pbs: ProxmoxAPI) -> list[Entity]:
entities = []
for ds in datastores:
store_name = ds.get("name", ds.get("store", "unknown"))
usage = usage_map.get(store_name, {})
total = usage.get("total", 0)
used = usage.get("used", 0)
total_gb = total / (1024 ** 3) if total else 0
used_gb = used / (1024 ** 3) if used else 0
pct = (used / total * 100) if total else 0
path = ds.get("path", "N/A")
gc_schedule = ds.get("gc-schedule", "N/A")
# Count snapshots
try:
snapshots = collect_datastore_snapshots(pbs, store_name)
snap_count = len(snapshots)
except Exception:
snap_count = 0
log.info(" Datastore '%s': %.1f GB total, %.1f GB used (%.0f%%), %d snapshots",
store_name, total_gb, used_gb, pct, snap_count)
entities.append(Entity(service=Service(
device=_device_ref(hostname, site_name),
name=f"ds-{store_name}",
protocol="tcp",
ports=[8007],
description=f"PBS Datastore: {total_gb:.1f} GB total, {used_gb:.1f} GB used ({pct:.0f}%)",
comments=f"Path: {path}\nSnapshots: {snap_count}\nGC schedule: {gc_schedule}",
tags=["proxmox", "pbs", "datastore"],
)))
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(pbs: ProxmoxAPI, host_cfg: dict) -> list[Entity]:
site = host_cfg["site_name"]
entities: list[Entity] = []
node_status = collect_node_status(pbs)
hostname = node_status.get("hostname", host_cfg["pbs_host"])
log.info("PBS host: %s", hostname)
# Device
entities.append(build_device_entity(hostname, node_status, site))
# Interfaces and IPs
networks = collect_node_networks(pbs)
entities.extend(build_interface_entities(hostname, networks, site))
entities.extend(build_ip_entities(hostname, networks, site))
# Datastores as Services
datastores = collect_datastores(pbs)
log.info(" Datastores: %d", len(datastores))
usage_list = collect_datastore_usage(pbs)
usage_map = {u.get("store", u.get("name", "")): u for u in usage_list}
entities.extend(build_datastore_entities(hostname, datastores, usage_map, site, pbs))
return entities
def ingest_entities(entities: list[Entity], config: dict, dry_run: bool = False) -> None:
if dry_run:
client = DiodeDryRunClient(app_name="pbs-collector")
log.info("Dry-run mode: writing entities to stdout")
client.ingest(entities=entities)
return
with DiodeClient(
target=config["diode_target"],
client_id=config["client_id"],
client_secret=config["client_secret"],
app_name="pbs-collector",
app_version="0.1.0",
) as client:
log.info("Ingesting %d entities to %s ...", len(entities), config["diode_target"])
response = client.ingest(entities=entities)
if response.errors:
log.error("Ingestion errors: %s", response.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="Proxmox Backup Server collector for NetBox")
parser.add_argument("--dry-run", action="store_true", help="Output entities without ingesting")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env", help="Path to .env file")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(levelname)-8s %(message)s",
)
load_dotenv(args.env_file)
diode_config = get_diode_config()
pbs_hosts = get_pbs_hosts()
all_entities: list[Entity] = []
for i, host_cfg in enumerate(pbs_hosts, 1):
log.info("=== PBS Host %d/%d: %s ===", i, len(pbs_hosts), host_cfg["pbs_host"])
try:
pbs = connect_pbs(host_cfg)
entities = collect_all_entities(pbs, host_cfg)
log.info("Collected %d entities from %s", len(entities), host_cfg["pbs_host"])
all_entities.extend(entities)
except Exception:
log.exception("Failed to collect from PBS host %s", host_cfg["pbs_host"])
log.info("Total: %d entities from %d PBS host(s)", len(all_entities), len(pbs_hosts))
if not all_entities:
log.warning("No entities collected. Exiting.")
return
ingest_entities(all_entities, diode_config, dry_run=args.dry_run)
log.info("Done.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,843 @@
#!/usr/bin/env python3
"""Proxmox VE collector for NetBox via Diode SDK.
Discovers nodes, QEMU VMs, LXC containers, interfaces, IPs, and disks
from a Proxmox VE host and ingests them into NetBox through the Diode pipeline.
"""
import argparse
import logging
import os
import re
import sys
from proxmoxer import ProxmoxAPI
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cluster,
ClusterType,
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
VirtualDisk,
VirtualMachine,
VMInterface,
)
log = logging.getLogger("proxmox-collector")
# ---------------------------------------------------------------------------
# Status / type mapping tables
# ---------------------------------------------------------------------------
PVE_TO_NETBOX_STATUS = {
"running": "active",
"stopped": "offline",
"paused": "offline",
"suspended": "offline",
"unknown": "planned",
}
PVE_OSTYPE_MAP = {
# QEMU
"l26": "Linux",
"l24": "Linux",
"win10": "Windows 10/Server 2016+",
"win11": "Windows 11/Server 2022+",
"win8": "Windows 8/Server 2012",
"win7": "Windows 7/Server 2008 R2",
"wxp": "Windows XP",
"w2k": "Windows 2000",
"solaris": "Solaris",
"other": "Other",
# LXC
"debian": "Debian",
"ubuntu": "Ubuntu",
"centos": "CentOS",
"fedora": "Fedora",
"opensuse": "openSUSE",
"archlinux": "Arch Linux",
"alpine": "Alpine Linux",
"gentoo": "Gentoo",
"nixos": "NixOS",
"unmanaged": "Unmanaged",
}
PVE_IFACE_TYPE_MAP = {
"eth": "1000base-t",
"bond": "lag",
"bridge": "bridge",
"vlan": "virtual",
"OVSBridge": "bridge",
"OVSBond": "lag",
"OVSPort": "virtual",
"OVSIntPort": "virtual",
"veth": "virtual",
"lo": "virtual",
}
QEMU_DISK_RE = re.compile(r"^(scsi|virtio|ide|sata|efidisk)\d+$")
LXC_DISK_RE = re.compile(r"^(rootfs|mp\d+)$")
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
"""Load key=value pairs from a .env file into os.environ."""
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip("\"'")
os.environ.setdefault(key, value)
def get_diode_config() -> dict:
"""Read Diode connection config from environment variables."""
cfg = {
"diode_target": os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode"),
"client_id": os.getenv("INGESTER_CLIENT_ID", os.getenv("DIODE_CLIENT_ID", "diode-ingester")),
"client_secret": os.getenv("INGESTER_CLIENT_SECRET", os.getenv("DIODE_CLIENT_SECRET")),
}
if not cfg["client_secret"]:
log.error("Missing required env var: INGESTER_CLIENT_SECRET or DIODE_CLIENT_SECRET")
sys.exit(1)
return cfg
def get_pve_hosts() -> list[dict]:
"""Build list of PVE host configs from numbered env vars.
Supports PVE_HOST_1/PVE_USER_1/... through PVE_HOST_N.
Falls back to legacy single PVE_HOST if no numbered vars exist.
"""
hosts = []
# Scan numbered PVE_HOST_1 through PVE_HOST_N (stop after 3 consecutive misses)
misses = 0
for i in range(1, 100):
host = os.getenv(f"PVE_HOST_{i}")
if host is None:
misses += 1
if misses >= 3:
break
continue
misses = 0
hosts.append({
"pve_host": host,
"pve_user": os.getenv(f"PVE_USER_{i}", os.getenv("PVE_USER", "root@pam")),
"pve_token_name": os.getenv(f"PVE_TOKEN_NAME_{i}", os.getenv("PVE_TOKEN_NAME")),
"pve_token_value": os.getenv(f"PVE_TOKEN_VALUE_{i}", os.getenv("PVE_TOKEN_VALUE")),
"pve_verify_ssl": os.getenv(
f"PVE_VERIFY_SSL_{i}", os.getenv("PVE_VERIFY_SSL", "false")
).lower() in ("true", "1", "yes"),
"pve_port": int(os.getenv(f"PVE_PORT_{i}", os.getenv("PVE_PORT", "8006"))),
"site_name": os.getenv(f"PVE_SITE_{i}", os.getenv("SITE_NAME", "main")),
})
# Also include legacy PVE_HOST if it exists and isn't already in the list
legacy_host = os.getenv("PVE_HOST")
if legacy_host:
already_listed = any(h["pve_host"] == legacy_host for h in hosts)
if not already_listed:
hosts.insert(0, {
"pve_host": legacy_host,
"pve_user": os.getenv("PVE_USER", "root@pam"),
"pve_token_name": os.getenv("PVE_TOKEN_NAME"),
"pve_token_value": os.getenv("PVE_TOKEN_VALUE"),
"pve_verify_ssl": os.getenv("PVE_VERIFY_SSL", "false").lower() in ("true", "1", "yes"),
"pve_port": int(os.getenv("PVE_PORT", "8006")),
"site_name": os.getenv("SITE_NAME", "main"),
})
if not hosts:
log.error("No PVE hosts configured (set PVE_HOST or PVE_HOST_1)")
sys.exit(1)
# Validate each host
for i, h in enumerate(hosts):
missing = [k for k in ("pve_host", "pve_token_name", "pve_token_value") if not h.get(k)]
if missing:
log.error("PVE host %d (%s): missing %s", i + 1, h.get("pve_host", "?"), ", ".join(missing))
sys.exit(1)
return hosts
# ---------------------------------------------------------------------------
# PVE connection
# ---------------------------------------------------------------------------
def connect_pve(config: dict) -> ProxmoxAPI:
"""Create and return a ProxmoxAPI connection."""
return ProxmoxAPI(
config["pve_host"],
port=config["pve_port"],
user=config["pve_user"],
token_name=config["pve_token_name"],
token_value=config["pve_token_value"],
verify_ssl=config["pve_verify_ssl"],
backend="https",
)
# ---------------------------------------------------------------------------
# Data collection (pure PVE API calls)
# ---------------------------------------------------------------------------
def collect_node_info(prox: ProxmoxAPI, node: str) -> dict:
return prox.nodes(node).status.get()
def collect_node_networks(prox: ProxmoxAPI, node: str) -> list[dict]:
return prox.nodes(node).network.get()
def collect_qemu_vms(prox: ProxmoxAPI, node: str) -> list[dict]:
return prox.nodes(node).qemu.get()
def collect_vm_config(prox: ProxmoxAPI, node: str, vmid: int) -> dict:
return prox.nodes(node).qemu(vmid).config.get()
def collect_vm_guest_agent_ips(prox: ProxmoxAPI, node: str, vmid: int) -> list[dict] | None:
try:
resp = prox.nodes(node).qemu(vmid).agent("network-get-interfaces").get()
return resp.get("result", resp) if isinstance(resp, dict) else resp
except Exception as exc:
log.debug("Guest agent unavailable for VM %s: %s", vmid, exc)
return None
def collect_lxc_interfaces(prox: ProxmoxAPI, node: str, vmid: int) -> list[dict] | None:
"""Get runtime network interfaces for a running LXC container (includes DHCP IPs)."""
try:
resp = prox.nodes(node).lxc(vmid).interfaces.get()
return resp if resp else None
except Exception as exc:
log.debug("Interfaces unavailable for LXC %s: %s", vmid, exc)
return None
def collect_lxc_containers(prox: ProxmoxAPI, node: str) -> list[dict]:
return prox.nodes(node).lxc.get()
def collect_lxc_config(prox: ProxmoxAPI, node: str, vmid: int) -> dict:
return prox.nodes(node).lxc(vmid).config.get()
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def parse_pve_net_config(raw: str) -> dict:
"""Parse QEMU net config: 'virtio=AA:BB:CC:DD:EE:FF,bridge=vmbr0,firewall=1'."""
result = {}
parts = raw.split(",")
for part in parts:
if "=" not in part:
continue
k, v = part.split("=", 1)
result[k] = v
# The first key=value is model=mac (e.g. virtio=AA:BB:...)
for k, v in result.items():
if re.match(r"^[0-9A-Fa-f]{2}(:[0-9A-Fa-f]{2}){5}$", v):
result["model"] = k
result["mac"] = v
break
return result
def parse_lxc_net_config(raw: str) -> dict:
"""Parse LXC net config: 'name=eth0,bridge=vmbr0,hwaddr=...,ip=10.0.0.5/24'."""
result = {}
for part in raw.split(","):
if "=" not in part:
continue
k, v = part.split("=", 1)
result[k] = v
return result
def parse_disk_size(size_str: str) -> int:
"""Parse '32G', '512M', '1T' to integer GB."""
m = re.match(r"(\d+(?:\.\d+)?)\s*([GMTK]?)", size_str, re.IGNORECASE)
if not m:
return 0
value = float(m.group(1))
unit = m.group(2).upper()
if unit == "T":
return int(value * 1024)
if unit in ("G", ""):
return int(value)
if unit == "M":
return max(1, int(value / 1024))
if unit == "K":
return max(1, int(value / (1024 * 1024)))
return int(value)
def parse_pve_disk_config(raw: str) -> dict:
"""Parse PVE disk config: 'local-lvm:vm-100-disk-0,size=32G'."""
result = {"storage": "", "volume": "", "size_gb": 0}
parts = raw.split(",")
# First part is storage:volume
if parts:
sv = parts[0]
if ":" in sv:
result["storage"], result["volume"] = sv.split(":", 1)
else:
result["volume"] = sv
# Find size=
for part in parts:
if part.startswith("size="):
result["size_gb"] = parse_disk_size(part[5:])
break
return result
# ---------------------------------------------------------------------------
# Mapping helpers
# ---------------------------------------------------------------------------
def map_pve_status(pve_status: str) -> str:
return PVE_TO_NETBOX_STATUS.get(pve_status.lower(), "active")
def map_pve_interface_type(pve_type: str, iface_name: str) -> str:
if pve_type in PVE_IFACE_TYPE_MAP:
return PVE_IFACE_TYPE_MAP[pve_type]
if iface_name.startswith(("eno", "enp", "ens", "eth")):
return "1000base-t"
if iface_name.startswith("vmbr"):
return "bridge"
return "other"
def map_ostype(ostype: str) -> str:
return PVE_OSTYPE_MAP.get(ostype, ostype or "Other")
def build_mac_to_netkey_map(vm_config: dict) -> dict[str, str]:
"""Build MAC address -> PVE net key mapping (e.g. 'AA:BB:...' -> 'net0')."""
mac_map = {}
for key, value in vm_config.items():
if not re.match(r"^net\d+$", key):
continue
parsed = parse_pve_net_config(str(value))
mac = parsed.get("mac", "").lower()
if mac:
mac_map[mac] = key
return mac_map
def sum_disk_sizes(vm_config: dict, vm_type: str) -> int:
"""Sum all disk sizes from a VM/LXC config, return total in GB."""
pattern = QEMU_DISK_RE if vm_type == "qemu" else LXC_DISK_RE
total = 0
for key, value in vm_config.items():
if not pattern.match(key):
continue
raw = str(value)
if "media=cdrom" in raw or raw.startswith("none"):
continue
disk = parse_pve_disk_config(raw)
total += disk["size_gb"]
return total
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_cluster_entity(node_name: str, site_name: str) -> Entity:
return Entity(cluster=Cluster(
name=node_name,
type=ClusterType(name="Proxmox VE"),
scope_site=Site(name=site_name),
status="active",
tags=["proxmox"],
))
def build_node_device_entity(node_name: str, node_status: dict, site_name: str) -> Entity:
cpuinfo = node_status.get("cpuinfo", {})
pve_version = node_status.get("pveversion", "")
kernel = node_status.get("kversion", "")
mem_gb = node_status.get("memory", {}).get("total", 0) / (1024 ** 3)
return Entity(device=Device(
name=node_name,
device_type=DeviceType(
model="Proxmox VE Host",
manufacturer=Manufacturer(name="Proxmox Server Solutions GmbH"),
),
role=DeviceRole(name="Hypervisor"),
platform=Platform(
name="Proxmox VE",
manufacturer=Manufacturer(name="Proxmox Server Solutions GmbH"),
),
site=Site(name=site_name),
status="active",
description=f"PVE {pve_version}, kernel {kernel}",
comments=f"CPU: {cpuinfo.get('model', 'N/A')}, "
f"Sockets: {cpuinfo.get('sockets', '?')}, "
f"Cores: {cpuinfo.get('cores', '?')}, "
f"Threads: {cpuinfo.get('cpus', '?')}, "
f"Memory: {mem_gb:.1f} GB",
tags=["proxmox", "hypervisor"],
))
def build_node_interface_entities(
node_name: str, interfaces: list[dict], site_name: str,
) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
pve_type = iface.get("type", "")
entities.append(Entity(interface=Interface(
device=Device(name=node_name, site=Site(name=site_name)),
name=name,
type=map_pve_interface_type(pve_type, name),
enabled=bool(iface.get("active", 0)),
mtu=int(iface["mtu"]) if iface.get("mtu") else None,
description=_iface_description(iface),
tags=["proxmox"],
)))
return entities
def _iface_description(iface: dict) -> str:
parts = []
if iface.get("bridge_ports"):
parts.append(f"bridge_ports: {iface['bridge_ports']}")
if iface.get("bond_slaves") or iface.get("slaves"):
parts.append(f"slaves: {iface.get('bond_slaves') or iface.get('slaves')}")
if iface.get("comments"):
parts.append(iface["comments"])
return ", ".join(parts) if parts else ""
def build_node_ip_entities(
node_name: str, interfaces: list[dict], site_name: str,
) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
cidr = iface.get("cidr")
address = iface.get("address")
netmask = iface.get("netmask")
if cidr:
ip_str = cidr
elif address and netmask:
ip_str = f"{address}/{_netmask_to_prefix(netmask)}"
else:
continue
pve_type = iface.get("type", "")
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=Device(
name=node_name,
device_type=DeviceType(
model="Proxmox VE Host",
manufacturer=Manufacturer(name="Proxmox Server Solutions GmbH"),
),
role=DeviceRole(name="Hypervisor"),
site=Site(name=site_name),
),
name=name,
type=map_pve_interface_type(pve_type, name),
),
tags=["proxmox"],
)))
return entities
def _netmask_to_prefix(mask: str) -> int:
try:
return sum(bin(int(o)).count("1") for o in mask.split("."))
except (ValueError, AttributeError):
return 24
def _first_ipv4(ip_entities: list[Entity]) -> str | None:
"""Extract the first IPv4 address (with prefix) from a list of IPAddress entities."""
for ent in ip_entities:
addr = ent.ip_address.address
if addr and ":" not in addr: # skip IPv6
return addr
return None
def _vm_ref(vm_name: str, node_name: str, site_name: str, role_name: str) -> VirtualMachine:
"""Build a rich VirtualMachine reference with enough context for the reconciler."""
return VirtualMachine(
name=vm_name,
site=Site(name=site_name),
cluster=Cluster(
name=node_name,
type=ClusterType(name="Proxmox VE"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name=role_name),
)
def build_vm_entity(
vm_data: dict, vm_config: dict, node_name: str, site_name: str, vm_type: str,
primary_ip4: str | None = None,
) -> Entity:
vm_name = vm_config.get("hostname") or vm_data.get("name") or f"vm-{vm_data['vmid']}"
memory_mb = int(vm_config.get("memory", 0))
if vm_type == "qemu":
vcpus = int(vm_config.get("cores", 1)) * int(vm_config.get("sockets", 1))
role_name = "Virtual Machine"
tags = ["proxmox", "qemu"]
else:
vcpus = int(vm_config.get("cores", 1))
role_name = "LXC Container"
tags = ["proxmox", "lxc"]
ostype = vm_config.get("ostype", "other")
disk_gb = sum_disk_sizes(vm_config, vm_type)
vm_kwargs = dict(
name=vm_name,
status=map_pve_status(vm_data.get("status", "unknown")),
site=Site(name=site_name),
cluster=Cluster(
name=node_name,
type=ClusterType(name="Proxmox VE"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name=role_name),
platform=Platform(name=map_ostype(ostype)),
vcpus=float(vcpus),
memory=memory_mb,
disk=disk_gb,
description=f"VMID: {vm_data['vmid']}",
comments=vm_config.get("description", ""),
tags=tags,
)
if primary_ip4:
vm_kwargs["primary_ip4"] = IPAddress(address=primary_ip4)
return Entity(virtual_machine=VirtualMachine(**vm_kwargs))
def build_vm_interface_entities(
vm_name: str, vm_config: dict, vm_type: str,
node_name: str = "", site_name: str = "",
) -> list[Entity]:
role_name = "Virtual Machine" if vm_type == "qemu" else "LXC Container"
entities = []
for key, value in sorted(vm_config.items()):
if not re.match(r"^net\d+$", key):
continue
raw = str(value)
if vm_type == "lxc":
parsed = parse_lxc_net_config(raw)
name = parsed.get("name", key)
else:
parsed = parse_pve_net_config(raw)
name = key
bridge = parsed.get("bridge", "N/A")
model = parsed.get("model", "veth")
entities.append(Entity(vm_interface=VMInterface(
virtual_machine=_vm_ref(vm_name, node_name, site_name, role_name),
name=name,
enabled=True,
description=f"bridge={bridge}, model={model}",
tags=["proxmox"],
)))
return entities
def build_vm_disk_entities(
vm_name: str, vm_config: dict, vm_type: str,
node_name: str = "", site_name: str = "",
) -> list[Entity]:
role_name = "Virtual Machine" if vm_type == "qemu" else "LXC Container"
pattern = QEMU_DISK_RE if vm_type == "qemu" else LXC_DISK_RE
entities = []
for key, value in sorted(vm_config.items()):
if not pattern.match(key):
continue
raw = str(value)
if "media=cdrom" in raw or raw.startswith("none"):
continue
disk = parse_pve_disk_config(raw)
if disk["size_gb"] == 0:
continue
entities.append(Entity(virtual_disk=VirtualDisk(
virtual_machine=_vm_ref(vm_name, node_name, site_name, role_name),
name=key,
size=disk["size_gb"],
description=f"{disk['storage']}:{disk['volume']}",
tags=["proxmox"],
)))
return entities
def build_vm_ip_entities_from_guest_agent(
vm_name: str, agent_ifaces: list[dict], mac_map: dict[str, str],
node_name: str = "", site_name: str = "",
) -> list[Entity]:
entities = []
for ga_iface in agent_ifaces:
ga_name = ga_iface.get("name", "unknown")
mac = ga_iface.get("hardware-address", "").lower()
pve_key = mac_map.get(mac, ga_name)
for ip_info in ga_iface.get("ip-addresses", []):
addr = ip_info.get("ip-address", "")
prefix = ip_info.get("prefix", 24)
if not addr:
continue
if addr.startswith("127.") or addr == "::1" or addr.startswith("fe80::"):
continue
entities.append(Entity(ip_address=IPAddress(
address=f"{addr}/{prefix}",
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=_vm_ref(vm_name, node_name, site_name, "Virtual Machine"),
name=pve_key,
),
tags=["proxmox", "guest-agent"],
)))
return entities
def build_lxc_ip_entities(
vm_name: str, vm_config: dict, runtime_ifaces: list[dict] | None = None,
node_name: str = "", site_name: str = "",
) -> list[Entity]:
"""Build IP entities for LXC from static config and/or runtime interfaces."""
entities = []
vm = _vm_ref(vm_name, node_name, site_name, "LXC Container")
# First try runtime interfaces (covers DHCP and static)
if runtime_ifaces:
for iface in runtime_ifaces:
iface_name = iface.get("name", "unknown")
if iface_name == "lo":
continue
for ip_info in iface.get("ip-addresses", []):
addr = ip_info.get("ip-address", "")
prefix = ip_info.get("prefix", "24")
if not addr:
continue
if addr.startswith("127.") or addr == "::1" or addr.startswith("fe80::"):
continue
entities.append(Entity(ip_address=IPAddress(
address=f"{addr}/{prefix}",
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm,
name=iface_name,
),
tags=["proxmox", "lxc"],
)))
return entities
# Fallback: static IPs from config (for stopped containers)
for key, value in sorted(vm_config.items()):
if not re.match(r"^net\d+$", key):
continue
parsed = parse_lxc_net_config(str(value))
iface_name = parsed.get("name", key)
for ip_field in ("ip", "ip6"):
ip_val = parsed.get(ip_field, "")
if not ip_val or ip_val in ("dhcp", "auto", "manual"):
continue
entities.append(Entity(ip_address=IPAddress(
address=ip_val,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm,
name=iface_name,
),
tags=["proxmox", "lxc"],
)))
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(prox: ProxmoxAPI, config: dict) -> list[Entity]:
site = config["site_name"]
entities: list[Entity] = []
nodes = prox.nodes.get()
log.info("Found %d node(s)", len(nodes))
for node_data in nodes:
node_name = node_data["node"]
log.info("Processing node: %s", node_name)
# Cluster
entities.append(build_cluster_entity(node_name, site))
# Node device
node_status = collect_node_info(prox, node_name)
entities.append(build_node_device_entity(node_name, node_status, site))
# Node interfaces + IPs
node_nets = collect_node_networks(prox, node_name)
entities.extend(build_node_interface_entities(node_name, node_nets, site))
entities.extend(build_node_ip_entities(node_name, node_nets, site))
# QEMU VMs
qemu_vms = collect_qemu_vms(prox, node_name)
log.info(" QEMU VMs: %d", len(qemu_vms))
for vm in qemu_vms:
vmid = vm["vmid"]
try:
vm_cfg = collect_vm_config(prox, node_name, vmid)
vm_name = vm_cfg.get("hostname") or vm.get("name") or f"vm-{vmid}"
log.info(" VM %s (VMID %s)", vm_name, vmid)
# Collect IPs first so we can set primary_ip4
mac_map = build_mac_to_netkey_map(vm_cfg)
agent_data = collect_vm_guest_agent_ips(prox, node_name, vmid)
ip_entities = []
if agent_data:
ip_entities = build_vm_ip_entities_from_guest_agent(
vm_name, agent_data, mac_map, node_name, site)
log.info(" Guest agent IPs collected")
else:
log.debug(" No guest agent for VM %s", vmid)
primary_ip4 = _first_ipv4(ip_entities)
entities.append(build_vm_entity(vm, vm_cfg, node_name, site, "qemu", primary_ip4))
entities.extend(build_vm_interface_entities(vm_name, vm_cfg, "qemu", node_name, site))
entities.extend(build_vm_disk_entities(vm_name, vm_cfg, "qemu", node_name, site))
entities.extend(ip_entities)
except Exception:
log.exception("Failed to process QEMU VM %s", vmid)
# LXC containers
lxc_cts = collect_lxc_containers(prox, node_name)
log.info(" LXC containers: %d", len(lxc_cts))
for ct in lxc_cts:
vmid = ct["vmid"]
try:
ct_cfg = collect_lxc_config(prox, node_name, vmid)
ct_name = ct_cfg.get("hostname") or ct.get("name") or f"ct-{vmid}"
log.info(" CT %s (VMID %s)", ct_name, vmid)
# Collect IPs first so we can set primary_ip4
lxc_ifaces = collect_lxc_interfaces(prox, node_name, vmid)
ip_entities = build_lxc_ip_entities(ct_name, ct_cfg, lxc_ifaces, node_name, site)
primary_ip4 = _first_ipv4(ip_entities)
entities.append(build_vm_entity(ct, ct_cfg, node_name, site, "lxc", primary_ip4))
entities.extend(build_vm_interface_entities(ct_name, ct_cfg, "lxc", node_name, site))
entities.extend(build_vm_disk_entities(ct_name, ct_cfg, "lxc", node_name, site))
entities.extend(ip_entities)
except Exception:
log.exception("Failed to process LXC container %s", vmid)
return entities
def ingest_entities(entities: list[Entity], config: dict, dry_run: bool = False) -> None:
if dry_run:
client = DiodeDryRunClient(app_name="proxmox-collector")
log.info("Dry-run mode: writing entities to stdout")
client.ingest(entities=entities)
return
with DiodeClient(
target=config["diode_target"],
client_id=config["client_id"],
client_secret=config["client_secret"],
app_name="proxmox-collector",
app_version="0.1.0",
) as client:
log.info("Ingesting %d entities to %s ...", len(entities), config["diode_target"])
response = client.ingest(entities=entities)
if response.errors:
log.error("Ingestion errors: %s", response.errors)
else:
log.info("Ingestion successful")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Proxmox VE collector for NetBox via Diode")
parser.add_argument("--dry-run", action="store_true", help="Output entities as JSON without ingesting")
parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env", help="Path to .env file")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(levelname)-8s %(message)s",
)
load_dotenv(args.env_file)
diode_config = get_diode_config()
pve_hosts = get_pve_hosts()
all_entities: list[Entity] = []
for i, host_cfg in enumerate(pve_hosts, 1):
log.info("=== PVE Host %d/%d: %s ===", i, len(pve_hosts), host_cfg["pve_host"])
try:
prox = connect_pve(host_cfg)
entities = collect_all_entities(prox, host_cfg)
log.info("Collected %d entities from %s", len(entities), host_cfg["pve_host"])
all_entities.extend(entities)
except Exception:
log.exception("Failed to collect from PVE host %s", host_cfg["pve_host"])
log.info("Total: %d entities from %d host(s)", len(all_entities), len(pve_hosts))
if not all_entities:
log.warning("No entities collected. Exiting.")
return
ingest_entities(all_entities, diode_config, dry_run=args.dry_run)
log.info("Done.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,754 @@
#!/usr/bin/env python3
"""UniFi collector for NetBox via Diode SDK.
Discovers Ubiquiti UniFi devices (UDM, switches, APs), their ports, radios,
VLANs, WLANs, and LLDP neighbors from the local UniFi Controller API and
ingests them into NetBox.
Supports:
- UDM Pro / UDM-SE / UDR / Cloud Key Gen2+ (UniFi OS)
- Legacy UniFi Controller (standalone)
Usage:
python collectors/unifi_collector.py --dry-run
python collectors/unifi_collector.py
"""
import argparse
import logging
import os
import sys
from unifi_controller_api import UnifiController
from netboxlabs.diode.sdk import DiodeClient
from netboxlabs.diode.sdk.ingester import (
Cable,
CableTermination,
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
GenericObject,
Interface,
IPAddress,
Manufacturer,
Platform,
Prefix,
Site,
VLAN,
VLANGroup,
WirelessLAN,
WirelessLANGroup,
)
log = logging.getLogger("unifi-collector")
# ---------------------------------------------------------------------------
# UniFi → NetBox mappings
# ---------------------------------------------------------------------------
# UniFi device type strings → NetBox role
DEVICE_TYPE_TO_ROLE = {
"ugw": "Firewall", # UniFi Gateway (USG, UDM, UDM-SE, etc.)
"usw": "Switch", # UniFi Switch
"uap": "Access Point", # UniFi AP
"uxg": "Firewall", # UniFi Next-Gen Gateway
"udm": "Firewall", # UniFi Dream Machine
"uck": "Server", # Cloud Key
}
# UniFi device type → NAPALM-style platform name
DEVICE_TYPE_TO_PLATFORM = {
"ugw": "Ubiquiti UniFi OS",
"usw": "Ubiquiti UniFi OS",
"uap": "Ubiquiti UniFi OS",
"uxg": "Ubiquiti UniFi OS",
"udm": "Ubiquiti UniFi OS",
"uck": "Ubiquiti UniFi OS",
}
# UniFi port speed → NetBox interface type
SPEED_TO_IFACE_TYPE = {
10: "1000base-t", # 10 Mbps — map to closest
100: "100base-tx",
1000: "1000base-t",
2500: "2.5gbase-t",
5000: "5gbase-t",
10000: "10gbase-t",
25000: "25gbase-x-sfp28",
40000: "40gbase-x-qsfpp",
}
# WiFi band codes → NetBox wireless interface types
RADIO_BAND_TO_TYPE = {
"ng": "ieee-802.11n", # 2.4 GHz
"na": "ieee-802.11ac", # 5 GHz
"ac": "ieee-802.11ac", # 5 GHz 802.11ac
"ax": "ieee-802.11ax", # WiFi 6
"6e": "ieee-802.11ax", # WiFi 6E
"be": "ieee-802.11ax", # WiFi 7 (closest mapping)
}
# WiFi band display names
RADIO_BAND_DISPLAY = {
"ng": "2.4 GHz",
"na": "5 GHz",
"ac": "5 GHz",
"ax": "WiFi 6",
"6e": "6 GHz",
"be": "WiFi 7",
}
# WiFi standard from band code
BAND_TO_STANDARD = {
"ng": "ieee-802.11n",
"na": "ieee-802.11ac",
"ac": "ieee-802.11ac",
"ax": "ieee-802.11ax",
"6e": "ieee-802.11ax",
"be": "ieee-802.11ax",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"host": os.environ.get("UNIFI_HOST", ""),
"user": os.environ.get("UNIFI_USER", ""),
"password": os.environ.get("UNIFI_PASSWORD", ""),
"site_id": os.environ.get("UNIFI_SITE", "default"),
"verify_ssl": os.environ.get("UNIFI_VERIFY_SSL", "false").lower() == "true",
"is_udm": os.environ.get("UNIFI_IS_UDM", "true").lower() == "true",
"netbox_site": os.environ.get("UNIFI_NETBOX_SITE", "main"),
}
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, role: str, site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name="Ubiquiti"),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Connection
# ---------------------------------------------------------------------------
def connect_unifi(cfg: dict) -> UnifiController:
host = cfg["host"]
if not host:
log.error("UNIFI_HOST not set")
sys.exit(1)
url = host if host.startswith("http") else f"https://{host}"
log.info("Connecting to UniFi controller at %s ...", url)
controller = UnifiController(
controller_url=url,
username=cfg["user"],
password=cfg["password"],
is_udm_pro=cfg["is_udm"],
verify_ssl=cfg["verify_ssl"],
)
log.info("Connected to UniFi controller")
return controller
# ---------------------------------------------------------------------------
# Data collection and entity building
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
controller = connect_unifi(cfg)
site_id = cfg["site_id"]
site_name = cfg["netbox_site"]
entities: list[Entity] = []
# Track device names for LLDP cable dedup
mac_to_device: dict[str, dict] = {}
all_lldp_entries: list[tuple[str, str, dict]] = [] # (device_name, local_port, lldp_entry)
# --- Devices (UDM, switches, APs) ---
log.info("Fetching devices from site '%s' ...", site_id)
try:
devices = controller.get_unifi_site_device(site_id, raw=True)
except Exception as exc:
log.error("Failed to fetch devices: %s", exc)
devices = []
if not devices:
log.warning("No devices returned from UniFi controller")
return entities
log.info("Found %d UniFi devices", len(devices))
for dev in devices:
try:
dev_entities, dev_lldp = _build_device_entities(dev, site_name, mac_to_device)
entities.extend(dev_entities)
all_lldp_entries.extend(dev_lldp)
except Exception as exc:
dev_name = dev.get("name") or dev.get("mac", "?")
log.error("Failed to process device %s: %s", dev_name, exc)
# --- Cables from LLDP ---
cable_entities = _build_cable_entities(all_lldp_entries, mac_to_device, site_name)
entities.extend(cable_entities)
# --- Networks / VLANs ---
log.info("Fetching network configurations ...")
try:
networks = controller.get_unifi_site_networkconf(site_id, raw=True)
log.info("Found %d networks", len(networks))
for net in networks:
try:
entities.extend(_build_network_entities(net, site_name))
except Exception as exc:
log.error("Failed to process network %s: %s",
net.get("name", "?"), exc)
except Exception as exc:
log.warning("Failed to fetch networks: %s", exc)
# --- WLANs ---
log.info("Fetching WLAN configurations ...")
try:
wlans = controller.get_unifi_site_wlanconf(site_id, raw=True)
log.info("Found %d WLANs", len(wlans))
for wlan in wlans:
try:
entities.extend(_build_wlan_entities(wlan, site_name))
except Exception as exc:
log.error("Failed to process WLAN %s: %s",
wlan.get("name", "?"), exc)
except Exception as exc:
log.warning("Failed to fetch WLANs: %s", exc)
return entities
# ---------------------------------------------------------------------------
# Device entity builder
# ---------------------------------------------------------------------------
def _build_device_entities(dev: dict, site_name: str,
mac_to_device: dict) -> tuple[list[Entity], list]:
"""Build Device + Interface entities from a UniFi device dict."""
entities: list[Entity] = []
lldp_entries: list[tuple[str, str, dict]] = []
mac = dev.get("mac", "")
name = dev.get("name") or dev.get("hostname") or mac
model_code = dev.get("model", "Unknown")
model_name = dev.get("model_name") or dev.get("model_in_lts") or model_code
dev_type = dev.get("type", "")
serial = dev.get("serial", "")
ip = dev.get("ip", "")
version = dev.get("version", "")
state = dev.get("state", 0)
adopted = dev.get("adopted", False)
role = DEVICE_TYPE_TO_ROLE.get(dev_type, "Network Device")
platform = DEVICE_TYPE_TO_PLATFORM.get(dev_type, "Ubiquiti UniFi OS")
# Device status
if state == 1 and adopted:
status = "active"
elif state == 0:
status = "offline"
else:
status = "planned"
# Track MAC → device info for LLDP
mac_to_device[mac.lower()] = {
"name": name,
"model": model_name,
"role": role,
}
# Custom fields
custom_fields = {}
if mac:
custom_fields["unifi_mac"] = CustomFieldValue(text=mac)
if version:
custom_fields["unifi_firmware"] = CustomFieldValue(text=version)
# --- Device entity ---
device_kwargs = dict(
name=name,
device_type=DeviceType(
model=model_name,
manufacturer=Manufacturer(name="Ubiquiti"),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
platform=Platform(name=platform),
serial=serial[:50] if serial else "",
status=status,
tags=["unifi"],
)
if custom_fields:
device_kwargs["custom_fields"] = custom_fields
entities.append(Entity(device=Device(**device_kwargs)))
dev_ref = _device_ref(name, model_name, role, site_name)
# --- Management IP ---
if ip and ip != "0.0.0.0":
# Management interface
entities.append(Entity(interface=Interface(
device=dev_ref,
name="mgmt",
type="other",
mac_address=mac,
enabled=True,
description="Management interface",
tags=["unifi"],
)))
entities.append(Entity(ip_address=IPAddress(
address=f"{ip}/32",
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name="mgmt",
type="other",
),
tags=["unifi"],
)))
# --- Switch ports (port_table) ---
port_table = dev.get("port_table") or []
for port in port_table:
port_entities, port_lldp = _build_port_entities(port, dev_ref, name, mac_to_device)
entities.extend(port_entities)
lldp_entries.extend(port_lldp)
# --- WiFi radios (radio_table) ---
radio_table = dev.get("radio_table") or []
radio_stats = dev.get("radio_table_stats") or []
# Merge stats into radio data
radio_stats_map = {}
for rs in radio_stats:
rname = rs.get("name") or rs.get("radio", "")
if rname:
radio_stats_map[rname] = rs
for radio in radio_table:
entities.extend(_build_radio_entities(radio, radio_stats_map, dev_ref))
# --- Ethernet interfaces (ethernet_table) ---
eth_table = dev.get("ethernet_table") or []
for eth in eth_table:
eth_mac = eth.get("mac", "")
eth_name = eth.get("name") or eth.get("label", "")
if eth_name and eth_name != "mgmt":
num_ports = eth.get("num_port", 0)
desc = f"{num_ports} ports" if num_ports else ""
entities.append(Entity(interface=Interface(
device=dev_ref,
name=eth_name[:64],
type="other",
mac_address=eth_mac,
enabled=True,
description=desc,
tags=["unifi"],
)))
# --- LLDP entries for cable discovery ---
lldp_info = dev.get("lldp_table") or dev.get("lldp_info") or []
for entry in lldp_info:
local_port = entry.get("local_port_name") or f"port{entry.get('local_port_idx', '?')}"
lldp_entries.append((name, local_port, entry))
return entities, lldp_entries
# ---------------------------------------------------------------------------
# Port entity builder
# ---------------------------------------------------------------------------
def _build_port_entities(port: dict, dev_ref: Device, dev_name: str,
mac_to_device: dict) -> tuple[list[Entity], list]:
"""Build Interface entities from a switch port_table entry."""
entities: list[Entity] = []
lldp_entries: list[tuple[str, str, dict]] = []
port_idx = port.get("port_idx", 0)
port_name = port.get("name") or f"Port {port_idx}"
mac = port.get("mac", "")
speed = int(port.get("speed", 0) or 0)
is_uplink = port.get("is_uplink", False)
up = port.get("up", False)
enabled = port.get("enable", True)
poe_enable = port.get("poe_enable", False)
poe_power = port.get("poe_power") or ""
media = port.get("media") or ""
sfp_found = port.get("sfp_found", False)
# Determine interface type from speed and media
if sfp_found or "SFP" in media.upper():
if speed >= 10000:
iface_type = "10gbase-x-sfpp"
elif speed >= 25000:
iface_type = "25gbase-x-sfp28"
else:
iface_type = "1000base-x-sfp"
else:
iface_type = SPEED_TO_IFACE_TYPE.get(speed, "1000base-t")
# Description parts
desc_parts = []
if is_uplink:
desc_parts.append("Uplink")
if poe_enable:
power_str = f" ({poe_power}W)" if poe_power else ""
desc_parts.append(f"PoE{power_str}")
entities.append(Entity(interface=Interface(
device=dev_ref,
name=port_name[:64],
type=iface_type,
mac_address=mac,
speed=speed * 1000 if speed else 0, # Mbps → Kbps
enabled=enabled,
description=", ".join(desc_parts)[:200] if desc_parts else "",
tags=["unifi"],
)))
# Collect LLDP from port if present
port_lldp = port.get("lldp_table") or []
for entry in port_lldp:
lldp_entries.append((dev_name, port_name, entry))
return entities, lldp_entries
# ---------------------------------------------------------------------------
# Radio entity builder
# ---------------------------------------------------------------------------
def _build_radio_entities(radio: dict, stats_map: dict,
dev_ref: Device) -> list[Entity]:
"""Build Interface entities from a WiFi radio_table entry."""
entities: list[Entity] = []
radio_name = radio.get("name") or radio.get("radio", "wifi0")
band = radio.get("radio", "ng")
channel = radio.get("channel") or ""
tx_power = radio.get("tx_power") or radio.get("tx_power_mode") or ""
ht = radio.get("ht") or ""
band_display = RADIO_BAND_DISPLAY.get(band, band)
iface_type = RADIO_BAND_TO_TYPE.get(band, "ieee-802.11n")
desc_parts = [band_display]
if channel:
desc_parts.append(f"ch{channel}")
if tx_power:
desc_parts.append(f"{tx_power}dBm")
if ht:
desc_parts.append(ht)
# Merge stats if available
stats = stats_map.get(radio_name, {})
num_sta = stats.get("user-num_sta") or stats.get("user_num_sta", 0)
if num_sta:
desc_parts.append(f"{num_sta} clients")
entities.append(Entity(interface=Interface(
device=dev_ref,
name=radio_name[:64],
type=iface_type,
enabled=True,
description=", ".join(desc_parts)[:200],
tags=["unifi", "wireless"],
)))
return entities
# ---------------------------------------------------------------------------
# Cable entity builder (from LLDP)
# ---------------------------------------------------------------------------
def _build_cable_entities(lldp_entries: list[tuple[str, str, dict]],
mac_to_device: dict,
site_name: str) -> list[Entity]:
"""Build Cable entities from LLDP neighbor data, deduplicating pairs."""
entities: list[Entity] = []
seen_cables: set[tuple] = set()
for dev_name, local_port, entry in lldp_entries:
chassis_id = (entry.get("chassis_id") or "").lower().replace("-", ":")
remote_port = entry.get("port_id") or entry.get("port_descr") or ""
chassis_name = entry.get("chassis_descr") or ""
if not chassis_id and not chassis_name:
continue
# Try to resolve remote device by MAC
remote_dev = mac_to_device.get(chassis_id, {})
remote_name = remote_dev.get("name") or chassis_name or chassis_id
remote_model = remote_dev.get("model", "Unknown")
remote_role = remote_dev.get("role", "Network Device")
# Dedup: sorted pair key
pair = tuple(sorted([
(dev_name, local_port),
(remote_name, remote_port),
]))
if pair in seen_cables:
continue
seen_cables.add(pair)
a_name, a_port = pair[0]
b_name, b_port = pair[1]
# Look up device info for both sides
a_info = None
b_info = None
for mac, info in mac_to_device.items():
if info["name"] == a_name:
a_info = info
if info["name"] == b_name:
b_info = info
a_model = a_info["model"] if a_info else "Unknown"
a_role = a_info["role"] if a_info else "Network Device"
b_model = b_info["model"] if b_info else "Unknown"
b_role = b_info["role"] if b_info else "Network Device"
a_ref = _device_ref(a_name, a_model, a_role, site_name)
b_ref = _device_ref(b_name, b_model, b_role, site_name)
entities.append(Entity(cable=Cable(
a_terminations=[CableTermination(
termination=GenericObject(
object_interface=Interface(
device=a_ref,
name=a_port[:64],
)
),
)],
b_terminations=[CableTermination(
termination=GenericObject(
object_interface=Interface(
device=b_ref,
name=b_port[:64],
)
),
)],
status="connected",
tags=["unifi", "lldp"],
)))
log.debug("Cable: %s:%s%s:%s", a_name, a_port, b_name, b_port)
log.info("Built %d cable entities from LLDP", len(entities))
return entities
# ---------------------------------------------------------------------------
# Network / VLAN entity builder
# ---------------------------------------------------------------------------
def _build_network_entities(net: dict, site_name: str) -> list[Entity]:
"""Build VLAN + Prefix entities from a UniFi network config."""
entities: list[Entity] = []
name = net.get("name", "")
purpose = net.get("purpose", "")
vlan_id = net.get("vlan")
vlan_enabled = net.get("vlan_enabled", False)
subnet = net.get("ip_subnet", "")
enabled = net.get("enabled", True)
if not name:
return entities
# VLAN entity (if VLAN tagging is enabled)
if vlan_enabled and vlan_id:
try:
vid = int(vlan_id)
except (ValueError, TypeError):
vid = None
if vid and 1 <= vid <= 4094:
entities.append(Entity(vlan=VLAN(
vid=vid,
name=name[:64],
group=VLANGroup(name="UniFi"),
site=Site(name=site_name),
status="active" if enabled else "reserved",
tags=["unifi"],
)))
# Prefix entity (if subnet is defined)
if subnet and "/" in subnet:
entities.append(Entity(prefix=Prefix(
prefix=subnet,
scope_site=Site(name=site_name),
status="active",
description=f"UniFi network: {name} ({purpose})"[:200],
tags=["unifi"],
)))
return entities
# ---------------------------------------------------------------------------
# WLAN entity builder
# ---------------------------------------------------------------------------
def _build_wlan_entities(wlan: dict, site_name: str) -> list[Entity]:
"""Build WirelessLAN entities from a UniFi WLAN config."""
entities: list[Entity] = []
name = wlan.get("name", "")
ssid = wlan.get("name", "") # UniFi uses 'name' as the SSID
enabled = wlan.get("enabled", True)
security = wlan.get("security", "")
hide_ssid = wlan.get("hide_ssid", False)
wpa_mode = wlan.get("wpa_mode", "")
wpa3 = wlan.get("wpa3_support", False)
if not ssid:
return entities
# Determine auth type for NetBox
if security == "open":
auth_type = "open"
elif wpa3:
auth_type = "wpa3-personal"
elif "wpa2" in wpa_mode.lower() or security == "wpapsk":
auth_type = "wpa2-personal"
elif "wpa-enterprise" in security.lower() or "wpaeap" in security.lower():
auth_type = "wpa2-enterprise"
else:
auth_type = "wpa2-personal"
# Get VLAN if assigned
vlan_id = None
# UniFi WLANs link to networks via networkconf_id, but we don't have
# easy access to the VLAN ID without cross-referencing. Mark in description.
desc_parts = []
if hide_ssid:
desc_parts.append("Hidden SSID")
if security:
desc_parts.append(f"Security: {security}")
entities.append(Entity(wireless_lan=WirelessLAN(
ssid=ssid,
group=WirelessLANGroup(name="UniFi"),
status="active" if enabled else "disabled",
auth_type=auth_type,
description=", ".join(desc_parts)[:200] if desc_parts else "",
tags=["unifi"],
)))
return entities
# ---------------------------------------------------------------------------
# Ingest
# ---------------------------------------------------------------------------
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="unifi-collector",
app_version="0.1.0",
) as client:
resp = client.ingest(entities=entities)
if resp.errors:
log.error("Ingestion errors: %s", resp.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="UniFi collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,544 @@
#!/usr/bin/env python3
"""VMware vSphere collector for NetBox via Diode SDK.
Discovers ESXi hosts, VMs, interfaces, IPs, and disks from a vCenter
or standalone ESXi host and ingests them into NetBox via the Diode pipeline.
Usage:
python collectors/vmware_collector.py --dry-run
python collectors/vmware_collector.py
"""
import argparse
import atexit
import logging
import os
import re
import ssl
import sys
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim, vmodl
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cluster,
ClusterGroup,
ClusterType,
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
VirtualDisk,
VirtualMachine,
VMInterface,
)
log = logging.getLogger("vmware-collector")
# ---------------------------------------------------------------------------
# Status mappings
# ---------------------------------------------------------------------------
VM_POWER_STATE_MAP = {
"poweredOn": "active",
"poweredOff": "offline",
"suspended": "offline",
}
HOST_STATUS_MAP = {
"green": "active",
"yellow": "active",
"red": "failed",
"gray": "planned",
}
SPEED_TO_TYPE = {
100: "100base-tx",
1000: "1000base-t",
2500: "2.5gbase-t",
10000: "10gbase-x-sfpp",
25000: "25gbase-x-sfp28",
40000: "40gbase-x-qsfpp",
100000: "100gbase-x-qsfp28",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"host": os.environ.get("VCENTER_HOST", ""),
"user": os.environ.get("VCENTER_USER", "administrator@vsphere.local"),
"password": os.environ.get("VCENTER_PASSWORD", ""),
"port": int(os.environ.get("VCENTER_PORT", "443")),
"verify_ssl": os.environ.get("VCENTER_VERIFY_SSL", "false").lower() == "true",
"site": os.environ.get("VCENTER_SITE", "main"),
}
# ---------------------------------------------------------------------------
# Reference helpers
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, manufacturer: str, role: str,
site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
def _vm_ref(name: str, cluster_name: str, site_name: str,
role: str = "Virtual Machine") -> VirtualMachine:
return VirtualMachine(
name=name,
site=Site(name=site_name),
cluster=Cluster(
name=cluster_name,
type=ClusterType(name="VMware ESXi"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name=role),
)
# ---------------------------------------------------------------------------
# vSphere connection
# ---------------------------------------------------------------------------
def connect_vsphere(cfg: dict):
"""Connect to vCenter/ESXi and return ServiceInstance."""
host = cfg["host"]
if not host:
log.error("VCENTER_HOST not set")
sys.exit(1)
context = None
if not cfg["verify_ssl"]:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
si = SmartConnect(
host=host,
user=cfg["user"],
pwd=cfg["password"],
port=cfg["port"],
sslContext=context,
)
atexit.register(Disconnect, si)
log.info("Connected to vSphere: %s", host)
return si
def get_all_objects(si, obj_type, folder=None):
"""Get all managed objects of a given type."""
content = si.RetrieveContent()
container = content.viewManager.CreateContainerView(
folder or content.rootFolder, [obj_type], True
)
objects = list(container.view)
container.Destroy()
return objects
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_cluster_entities(si, site_name: str) -> list[Entity]:
"""Build Cluster entities from vSphere clusters."""
entities = []
clusters = get_all_objects(si, vim.ClusterComputeResource)
for cluster in clusters:
dc_name = ""
parent = cluster.parent
while parent:
if isinstance(parent, vim.Datacenter):
dc_name = parent.name
break
parent = getattr(parent, "parent", None)
entities.append(Entity(cluster=Cluster(
name=cluster.name,
type=ClusterType(name="VMware ESXi"),
scope_site=Site(name=site_name),
group=ClusterGroup(name=dc_name) if dc_name else None,
status="active",
tags=["vmware"],
)))
log.info(" Cluster: %s (DC: %s)", cluster.name, dc_name or "none")
return entities
def build_host_entities(si, site_name: str) -> tuple[list[Entity], dict]:
"""Build Device entities from ESXi hosts. Returns entities and host-to-cluster mapping."""
entities = []
host_cluster_map = {}
hosts = get_all_objects(si, vim.HostSystem)
for host in hosts:
hostname = host.name
hw = host.hardware
sys_info = hw.systemInfo if hw else None
model = sys_info.model if sys_info else "Unknown"
vendor = sys_info.vendor if sys_info else "Unknown"
serial = ""
if sys_info:
for ident in (sys_info.otherIdentifyingInfo or []):
if hasattr(ident, "identifierType") and \
ident.identifierType and \
ident.identifierType.key == "ServiceTag":
serial = ident.identifierValue
break
if not serial:
serial = getattr(sys_info, "serialNumber", "") or ""
status = HOST_STATUS_MAP.get(
str(host.overallStatus) if host.overallStatus else "gray",
"active"
)
# Determine cluster
cluster_name = ""
if isinstance(host.parent, vim.ClusterComputeResource):
cluster_name = host.parent.name
host_cluster_map[hostname] = cluster_name
entities.append(Entity(device=Device(
name=hostname,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=vendor),
),
role=DeviceRole(name="Hypervisor"),
platform=Platform(name="VMware ESXi"),
site=Site(name=site_name),
serial=serial[:50] if serial else "",
status=status,
tags=["vmware"],
)))
# Physical NICs
dev_ref = _device_ref(hostname, model, vendor, "Hypervisor", site_name)
if host.config and host.config.network:
for pnic in (host.config.network.pnic or []):
speed_mbps = 0
if pnic.linkSpeed:
speed_mbps = pnic.linkSpeed.speedMb
iface_type = SPEED_TO_TYPE.get(speed_mbps, "1000base-t")
entities.append(Entity(interface=Interface(
device=dev_ref,
name=pnic.device,
type=iface_type,
primary_mac_address=pnic.mac or "",
speed=speed_mbps * 1000 if speed_mbps else 0,
enabled=True,
tags=["vmware"],
)))
# VMkernel interfaces
for vnic in (host.config.network.vnic or []):
ip_str = ""
if vnic.spec and vnic.spec.ip:
ip = vnic.spec.ip.ipAddress
mask = vnic.spec.ip.subnetMask
if ip:
prefix_len = _mask_to_prefix(mask) if mask else 24
ip_str = f"{ip}/{prefix_len}"
entities.append(Entity(interface=Interface(
device=dev_ref,
name=vnic.device,
type="virtual",
primary_mac_address=vnic.spec.mac if vnic.spec else "",
enabled=True,
tags=["vmware", "vmkernel"],
)))
if ip_str:
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name=vnic.device,
type="virtual",
),
tags=["vmware"],
)))
log.info(" Host: %s (%s %s, cluster=%s)",
hostname, vendor, model, cluster_name or "standalone")
return entities, host_cluster_map
def build_vm_entities(si, site_name: str,
host_cluster_map: dict) -> list[Entity]:
"""Build VirtualMachine + VMInterface + VirtualDisk + IPAddress entities."""
entities = []
vms = get_all_objects(si, vim.VirtualMachine)
for vm_obj in vms:
try:
vm_name = vm_obj.name
config = vm_obj.config
if not config:
log.debug(" Skipping VM with no config: %s", vm_name)
continue
# Determine cluster from host
host_name = ""
cluster_name = ""
if vm_obj.runtime and vm_obj.runtime.host:
host_name = vm_obj.runtime.host.name
cluster_name = host_cluster_map.get(host_name, "")
if not cluster_name:
cluster_name = host_name or "standalone"
power_state = str(vm_obj.runtime.powerState) if vm_obj.runtime else "poweredOff"
status = VM_POWER_STATE_MAP.get(power_state, "offline")
# Resources
vcpus = config.hardware.numCPU if config.hardware else 0
memory_mb = config.hardware.memoryMB if config.hardware else 0
total_disk_gb = 0
# Determine platform from guest
guest_os = config.guestFullName or config.guestId or ""
platform_name = None
if "linux" in guest_os.lower() or "ubuntu" in guest_os.lower() or \
"centos" in guest_os.lower() or "debian" in guest_os.lower():
platform_name = "Linux"
elif "windows" in guest_os.lower():
platform_name = "Windows"
# Collect IPs for primary_ip4
primary_ip4 = None
vm_ips = []
# Guest NIC info (requires VMware Tools)
if vm_obj.guest and vm_obj.guest.net:
for guest_nic in vm_obj.guest.net:
if guest_nic.ipConfig:
for ip_entry in guest_nic.ipConfig.ipAddress:
addr = ip_entry.ipAddress
prefix = ip_entry.prefixLength
if addr and not addr.startswith("fe80") and \
not addr.startswith("127."):
ip_str = f"{addr}/{prefix}"
nic_name = guest_nic.network or "eth0"
vm_ips.append((ip_str, nic_name))
if not primary_ip4 and ":" not in addr:
primary_ip4 = ip_str
# VirtualMachine entity
vm_kwargs = dict(
name=vm_name,
status=status,
site=Site(name=site_name),
cluster=Cluster(
name=cluster_name,
type=ClusterType(name="VMware ESXi"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Virtual Machine"),
vcpus=vcpus,
memory=memory_mb,
comments=f"Guest: {guest_os}" if guest_os else "",
tags=["vmware"],
)
if platform_name:
vm_kwargs["platform"] = Platform(name=platform_name)
if primary_ip4:
vm_kwargs["primary_ip4"] = IPAddress(address=primary_ip4)
entities.append(Entity(virtual_machine=VirtualMachine(**vm_kwargs)))
# VM NICs
vm_ref = _vm_ref(vm_name, cluster_name, site_name)
if config.hardware and config.hardware.device:
for device in config.hardware.device:
if isinstance(device, vim.vm.device.VirtualEthernetCard):
nic_name = device.deviceInfo.label if device.deviceInfo else f"nic{device.key}"
mac = getattr(device, "macAddress", "") or ""
net_name = ""
if hasattr(device, "backing"):
backing = device.backing
if hasattr(backing, "network") and backing.network:
net_name = backing.network.name
elif hasattr(backing, "deviceName"):
net_name = backing.deviceName
entities.append(Entity(vm_interface=VMInterface(
virtual_machine=vm_ref,
name=nic_name[:64],
enabled=device.connectable.connected if device.connectable else True,
primary_mac_address=mac,
description=net_name[:200] if net_name else "",
tags=["vmware"],
)))
# Virtual Disks
elif isinstance(device, vim.vm.device.VirtualDisk):
disk_name = device.deviceInfo.label if device.deviceInfo else f"disk{device.key}"
disk_size_gb = device.capacityInKB // (1024 * 1024) if device.capacityInKB else 0
total_disk_gb += disk_size_gb
if disk_size_gb > 0:
entities.append(Entity(virtual_disk=VirtualDisk(
virtual_machine=vm_ref,
name=disk_name[:64],
size=disk_size_gb,
tags=["vmware"],
)))
# IP entities from guest tools
for ip_str, nic_name in vm_ips:
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=nic_name[:64],
),
tags=["vmware"],
)))
log.info(" VM: %s (%s, %d vCPU, %d MB RAM, %d GB disk)",
vm_name, status, vcpus, memory_mb, total_disk_gb)
except Exception as exc:
log.error(" Failed to process VM %s: %s",
getattr(vm_obj, "name", "?"), exc)
return entities
def _mask_to_prefix(mask: str) -> int:
"""Convert subnet mask to prefix length."""
try:
return sum(bin(int(x)).count("1") for x in mask.split("."))
except (ValueError, AttributeError):
return 24
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
si = connect_vsphere(cfg)
site_name = cfg["site"]
entities: list[Entity] = []
# Clusters
entities.extend(build_cluster_entities(si, site_name))
# ESXi hosts + interfaces
host_entities, host_cluster_map = build_host_entities(si, site_name)
entities.extend(host_entities)
# VMs
entities.extend(build_vm_entities(si, site_name, host_cluster_map))
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="vmware-collector",
app_version="0.1.0",
) as client:
resp = client.ingest(entities=entities)
if resp.errors:
log.error("Ingestion errors: %s", resp.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="VMware vSphere collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,372 @@
#!/usr/bin/env python3
"""Zabbix collector for NetBox via Diode SDK.
Pulls device inventory from Zabbix API for brownfield import into NetBox.
Also adds Zabbix host IDs as custom fields for cross-referencing.
Usage:
python collectors/zabbix_collector.py --dry-run
python collectors/zabbix_collector.py
"""
import argparse
import logging
import os
import sys
from pyzabbix import ZabbixAPI
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Site,
)
log = logging.getLogger("zabbix-collector")
# ---------------------------------------------------------------------------
# Zabbix → NetBox mappings
# ---------------------------------------------------------------------------
ZABBIX_STATUS_MAP = {
0: "active", # Monitored
1: "offline", # Unmonitored
}
ZABBIX_IFACE_TYPE = {
1: "agent", # Zabbix agent
2: "snmp", # SNMP
3: "ipmi", # IPMI
4: "jmx", # JMX
}
# Best-effort OS → Platform mapping from Zabbix template groups
OS_KEYWORDS_TO_PLATFORM = {
"linux": "Linux",
"windows": "Windows",
"cisco": "Cisco IOS",
"juniper": "Juniper Junos",
"freebsd": "FreeBSD",
"vmware": "VMware ESXi",
"proxmox": "Proxmox VE",
"ubuntu": "Ubuntu",
"debian": "Debian",
"centos": "CentOS",
"rhel": "Red Hat Enterprise Linux",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"url": os.environ.get("ZABBIX_URL", ""),
"user": os.environ.get("ZABBIX_USER", "Admin"),
"password": os.environ.get("ZABBIX_PASSWORD", ""),
"api_token": os.environ.get("ZABBIX_API_TOKEN", ""),
"site": os.environ.get("ZABBIX_SITE", "main"),
"default_role": os.environ.get("ZABBIX_DEFAULT_ROLE", "Server"),
"group_to_role": {}, # Could be loaded from config
}
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, manufacturer: str, role: str,
site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=manufacturer),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Data collection
# ---------------------------------------------------------------------------
def connect_zabbix(cfg: dict) -> ZabbixAPI:
url = cfg["url"]
if not url:
log.error("ZABBIX_URL not set")
sys.exit(1)
zapi = ZabbixAPI(url)
zapi.session.verify = False
if cfg.get("api_token"):
zapi.login(api_token=cfg["api_token"])
else:
zapi.login(cfg["user"], cfg["password"])
log.info("Connected to Zabbix %s", zapi.api_version())
return zapi
def collect_hosts(zapi: ZabbixAPI) -> list[dict]:
"""Fetch all hosts with their interfaces and inventory data."""
hosts = zapi.host.get(
output=["hostid", "host", "name", "status", "description"],
selectInterfaces=["interfaceid", "ip", "dns", "port", "type", "main", "useip"],
selectInventory="extend",
selectGroups=["name"],
selectParentTemplates=["name"],
)
log.info("Fetched %d hosts from Zabbix", len(hosts))
return hosts
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def guess_platform(host_data: dict) -> str | None:
"""Try to guess platform from Zabbix templates/groups/inventory."""
# Check inventory OS field
inventory = host_data.get("inventory") or {}
if isinstance(inventory, dict):
os_full = inventory.get("os_full") or inventory.get("os_short") or ""
for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items():
if keyword.lower() in os_full.lower():
return platform
# Check template names
templates = host_data.get("parentTemplates") or []
for tmpl in templates:
tmpl_name = (tmpl.get("name") or "").lower()
for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items():
if keyword in tmpl_name:
return platform
return None
def guess_role(host_data: dict, default_role: str) -> str:
"""Try to guess device role from Zabbix groups."""
groups = host_data.get("groups") or []
for group in groups:
gname = (group.get("name") or "").lower()
if "router" in gname:
return "Router"
if "switch" in gname:
return "Switch"
if "firewall" in gname:
return "Firewall"
if "hypervisor" in gname:
return "Hypervisor"
if "server" in gname or "linux" in gname or "windows" in gname:
return "Server"
return default_role
def build_host_entities(host_data: dict, cfg: dict) -> list[Entity]:
"""Build Device + Interface + IPAddress entities from a Zabbix host."""
entities = []
site_name = cfg["site"]
default_role = cfg["default_role"]
hostname = host_data.get("name") or host_data.get("host", "unknown")
status = ZABBIX_STATUS_MAP.get(int(host_data.get("status", 0)), "active")
host_id = host_data.get("hostid", "")
# Inventory data
inventory = host_data.get("inventory") or {}
if isinstance(inventory, list):
inventory = {}
serial = inventory.get("serialno_a") or ""
model = inventory.get("model") or inventory.get("hardware") or "Unknown"
vendor = inventory.get("vendor") or inventory.get("hardware_full") or "Unknown"
asset_tag = inventory.get("asset_tag") or ""
role = guess_role(host_data, default_role)
platform = guess_platform(host_data)
# Custom fields for cross-referencing
custom_fields = {}
if host_id:
custom_fields["zabbix_host_id"] = CustomFieldValue(text=str(host_id))
# Device entity
device_kwargs = dict(
name=hostname,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name=vendor),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
status=status,
serial=serial[:50] if serial else "",
asset_tag=asset_tag[:50] if asset_tag else "",
tags=["zabbix"],
)
if platform:
device_kwargs["platform"] = Platform(name=platform)
if custom_fields:
device_kwargs["custom_fields"] = custom_fields
entities.append(Entity(device=Device(**device_kwargs)))
# Interface + IP entities from Zabbix interfaces
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
primary_ip = None
for iface in host_data.get("interfaces", []):
iface_type_num = int(iface.get("type", 1))
iface_type_name = ZABBIX_IFACE_TYPE.get(iface_type_num, "agent")
ip = iface.get("ip", "")
is_main = str(iface.get("main", "0")) == "1"
if not ip or ip == "0.0.0.0":
continue
# Interface name based on type
iface_name = f"zabbix-{iface_type_name}"
if is_main and iface_type_name == "agent":
iface_name = "mgmt0"
elif iface_type_name == "snmp":
iface_name = "snmp0"
elif iface_type_name == "ipmi":
iface_name = "ipmi0"
entities.append(Entity(interface=Interface(
device=dev_ref,
name=iface_name,
type="other",
enabled=True,
tags=["zabbix"],
)))
# IP address
ip_str = f"{ip}/32" # Zabbix doesn't provide prefix length
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name=iface_name,
type="other",
),
tags=["zabbix"],
)))
if is_main and primary_ip is None:
primary_ip = ip_str
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
zapi = connect_zabbix(cfg)
hosts = collect_hosts(zapi)
entities: list[Entity] = []
for host in hosts:
try:
entities.extend(build_host_entities(host, cfg))
except Exception as exc:
hostname = host.get("name") or host.get("host", "?")
log.error("Failed to process Zabbix host %s: %s", hostname, exc)
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="zabbix-collector",
app_version="0.1.0",
) as client:
resp = client.ingest(entities=entities)
if resp.errors:
log.error("Ingestion errors: %s", resp.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="Zabbix collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()