Add PBS collector, multi-host PVE support, and collector fixes

- proxmox_collector: support numbered PVE_HOST_1/2/3 env vars with
  backward compat for legacy single PVE_HOST; fix MTU string-to-int cast
- pbs_collector: new collector for Proxmox Backup Server — discovers
  devices, interfaces, IPs, and datastores (as Services) via PBS API
- vmware_collector: fix mac_address → primary_mac_address for Diode SDK
- network_collector: add Netmiko SSH fallback for Brocade/NOS devices,
  add Brocade ICX interface type patterns
- unifi_collector: new collector for UniFi UDM-SE/switches/APs
- ENV_REFERENCE.md: document all collector env vars and setup steps
- .gitignore: exclude collectors/inventory.yaml (contains credentials)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
sam 2026-02-28 16:10:12 -07:00
parent b4fcdfa277
commit 5748bad765
7 changed files with 2227 additions and 58 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ __pycache__/
# Generated by setup.sh (contain secrets)
oauth2/client/client-credentials.json
orb-agent/agent.yaml
collectors/inventory.yaml

245
collectors/ENV_REFERENCE.md Normal file
View File

@ -0,0 +1,245 @@
# Environment Variables Reference
All variables go in `.env` at the project root. Variables marked **[HAVE]**
are already configured. Variables marked **[NEED]** are what you need to gather.
---
## Diode Pipeline [HAVE]
Already configured and working.
```bash
INGESTER_CLIENT_ID=diode-ingester
INGESTER_CLIENT_SECRET=<already set>
NETBOX_API_URL=http://172.19.77.160:8000
NETBOX_API_TOKEN=nbt_<already set>
```
## Proxmox VE Collector [HAVE — partial]
Supports multiple standalone PVE hosts. prox940 already configured.
```bash
# Legacy single-host (still works)
PVE_HOST=192.168.1.190
PVE_USER=root@pam
PVE_TOKEN_NAME=diode
PVE_TOKEN_VALUE=<already set>
PVE_VERIFY_SSL=false
# Additional PVE hosts (numbered)
PVE_HOST_2=10.40.40.107 # proxmox2
PVE_USER_2=diode@pve
PVE_TOKEN_NAME_2=diode
PVE_TOKEN_VALUE_2= # NEED — create token on proxmox2
PVE_HOST_3=10.40.40.110 # proxmox3
PVE_USER_3=diode@pve
PVE_TOKEN_NAME_3=diode
PVE_TOKEN_VALUE_3= # NEED — create token on proxmox3
```
**Setup on each PVE host:**
```bash
pveum user add diode@pve --comment "Diode NetBox collector"
pveum aclmod / -user diode@pve -role PVEAuditor
pveum user token add diode@pve diode --privsep 0 --comment "NetBox Diode"
```
## Proxmox Backup Server Collector [NEED]
```bash
PBS_HOST_1=10.40.40.150 # PBS-01
PBS_USER_1=diode@pbs
PBS_TOKEN_NAME_1=diode
PBS_TOKEN_VALUE_1= # NEED — create token on PBS-01
PBS_HOST_2=192.168.1.241 # PBS-02
PBS_USER_2=diode@pbs
PBS_TOKEN_NAME_2=diode
PBS_TOKEN_VALUE_2= # NEED — create token on PBS-02
PBS_HOST_3=pbs.apodacalabs.com # PBS
PBS_USER_3=diode@pbs
PBS_TOKEN_NAME_3=diode
PBS_TOKEN_VALUE_3= # NEED — create token on PBS
```
**Setup on each PBS host:**
```bash
proxmox-backup-manager user create diode@pbs --comment "Diode NetBox collector"
proxmox-backup-manager acl update / Audit --auth-id diode@pbs
proxmox-backup-manager user generate-token diode@pbs diode
```
---
## Network Collector [NEED]
Credentials go in `collectors/inventory.yaml`, not `.env`.
Only these optional vars go in `.env`:
```bash
# Optional: skip pyATS even if installed (run with --no-pyats flag instead)
# No env vars strictly required — everything is in inventory.yaml
```
## CML Topology Collector [NEED]
```bash
CML_HOST= # CML controller IP or hostname (e.g., 10.40.40.50)
CML_USER=admin # CML admin username
CML_PASSWORD= # CML admin password
CML_LAB= # Optional: specific lab name/ID (blank = all labs)
CML_VERIFY_SSL=false # Set true if CML has valid TLS cert
CML_SITE=CML # NetBox site name for CML devices (default: CML)
```
**Setup on CML side:** Just need the controller address and admin creds.
The virl2_client library handles the REST API.
## Zabbix Collector [NEED]
```bash
ZABBIX_URL= # Full URL to API (e.g., http://10.40.40.20/api_jsonrpc.php)
ZABBIX_USER=Admin # Zabbix username
ZABBIX_PASSWORD= # Zabbix password
ZABBIX_API_TOKEN= # OR use an API token instead of user/pass (Zabbix 5.4+)
ZABBIX_SITE=main # NetBox site to assign devices to
ZABBIX_DEFAULT_ROLE=Server # Default role if group-based detection fails
```
**Setup on Zabbix side:** No setup needed — just need read access creds.
If using API token (Zabbix 5.4+): Administration → API tokens → Create.
## Observium Collector [NEED]
```bash
OBSERVIUM_URL= # API base URL (e.g., http://10.40.40.30/api/v0)
OBSERVIUM_USER=admin # Observium username
OBSERVIUM_PASSWORD= # Observium password
OBSERVIUM_SITE=main # NetBox site to assign devices to
OBSERVIUM_DEFAULT_ROLE=Network Device
```
**NOTE:** Observium REST API requires Professional or Enterprise edition.
Community Edition does not expose a REST API. If you're on Community,
skip this collector (Zabbix can cover similar ground).
## VMware Collector [NEED]
```bash
VCENTER_HOST= # vCenter or ESXi IP/hostname
VCENTER_USER=administrator@vsphere.local
VCENTER_PASSWORD= # vCenter/ESXi password
VCENTER_PORT=443 # API port (default: 443)
VCENTER_VERIFY_SSL=false # Set true if valid TLS cert
VCENTER_SITE=main # NetBox site to assign devices to
```
**Setup on vCenter side:** Just need a read-only account.
Minimum role: Read-only → Assign at vCenter root.
## Docker Collector [NEED — only if remote hosts]
Works immediately for local Docker (no env vars needed).
For remote Docker hosts:
```bash
DOCKER_HOSTS= # Comma-separated (e.g., tcp://10.0.0.5:2375,tcp://10.0.0.6:2375)
DOCKER_SITE=main # NetBox site
DOCKER_TLS_VERIFY=false # Set true if Docker TLS is configured
```
**Setup on remote Docker hosts:** Enable TCP API:
`dockerd -H unix:///var/run/docker.sock -H tcp://0.0.0.0:2375`
Or use TLS: https://docs.docker.com/engine/security/protect-access/
## UniFi Collector [NEED]
Discovers UDM-SE, switches, and APs from the local UniFi Controller API.
```bash
UNIFI_HOST=192.168.1.1 # UDM-SE / Controller IP (or hostname)
UNIFI_USER= # UniFi local admin username
UNIFI_PASSWORD= # UniFi local admin password
UNIFI_SITE=default # UniFi site name (usually "default")
UNIFI_VERIFY_SSL=false # UDM-SE uses self-signed cert by default
UNIFI_IS_UDM=true # true for UDM/UDM-SE/UDR, false for legacy controller
UNIFI_NETBOX_SITE=main # NetBox site to assign devices to
```
**What it discovers:**
- UDM-SE, switches, APs as Devices with model/serial/firmware
- Switch ports with speed, PoE, SFP detection
- WiFi radios with band/channel/power
- VLANs and subnets from network configurations
- WLANs (SSIDs) with auth type
- LLDP neighbors → Cables for topology mapping
**Setup on UDM-SE:** Just need a local admin account.
The API is built-in — no additional setup required.
## NAPALM Plugin (live device status in NetBox UI) [NEED]
These go in the NetBox Docker env, not the project `.env`.
Add to `/home/user/netbox-docker/env/netbox.env`:
```bash
NAPALM_USERNAME=admin # Same SSH creds as your network devices
NAPALM_PASSWORD= # SSH password
```
Then assign NAPALM drivers to Platforms in NetBox:
Devices → Platforms → edit each platform → set NAPALM driver
(e.g., Platform "Cisco IOS" → NAPALM driver: ios)
---
## Quick Checklist
| Collector | What to gather | Priority |
|-----------|---------------|----------|
| Network | SSH creds for routers/switches, fill in inventory.yaml | HIGH |
| CML | Controller IP + admin creds | HIGH |
| Zabbix | API URL + creds or API token | MEDIUM |
| Observium | API URL + creds (needs paid edition) | LOW |
| VMware | vCenter IP + read-only account | MEDIUM |
| Docker | Nothing (local works), or remote TCP URLs | LOW |
| Proxmox VE | Token per host (diode@pve + API token) | HIGH |
| PBS | Token per host (diode@pbs + API token) | HIGH |
| UniFi | UDM-SE IP + local admin creds | HIGH |
| NAPALM | SSH creds in netbox.env + assign drivers to platforms | MEDIUM |
## Testing Each Collector
All collectors support `--dry-run` for safe testing:
```bash
# Activate venv first
source .venv/bin/activate
# Network devices (highest value — discovers cables + topology)
python collectors/network_collector.py -i collectors/inventory.yaml --dry-run
# CML topology
python collectors/cml_collector.py --dry-run
# Zabbix import
python collectors/zabbix_collector.py --dry-run
# Observium import
python collectors/observium_collector.py --dry-run
# VMware
python collectors/vmware_collector.py --dry-run
# Docker (works immediately)
python collectors/docker_collector.py --dry-run
# UniFi (UDM-SE + APs)
python collectors/unifi_collector.py --dry-run
# Proxmox VE (multi-host — already tested)
python collectors/proxmox_collector.py --dry-run
# Proxmox Backup Server
python collectors/pbs_collector.py --dry-run
```
Add `--log-level DEBUG` to any command for verbose output.

View File

@ -63,8 +63,22 @@ try:
except ImportError:
HAS_REQUESTS = False
# Netmiko for devices without NAPALM drivers (e.g. Brocade VDX/NOS)
from netmiko import ConnectHandler
log = logging.getLogger("network-collector")
# Drivers that should use Netmiko SSH fallback instead of NAPALM.
# napalm-ruckus-fastiron installs but fails on open(); NOS has no NAPALM driver.
# Keys = inventory driver names, values = Netmiko device_type strings.
NETMIKO_ONLY_DRIVERS = {
"ruckus_fastiron": "ruckus_fastiron",
"nos": "extreme_nos",
"brocade_nos": "extreme_nos",
"brocade_vdx": "extreme_nos",
"extreme_nos": "extreme_nos",
}
# ---------------------------------------------------------------------------
# Interface type mapping
# ---------------------------------------------------------------------------
@ -98,6 +112,13 @@ NAME_TO_TYPE = {
r"^(Po|Port-channel|port-channel)": "lag",
r"^(BV|BVI)": "bridge",
r"^(Se|Serial)": "other",
# Brocade ICX / FastIron patterns
r"^e\s?\d": "1000base-t", # e 1/1/1
r"^ethernet\s?\d": "1000base-t", # ethernet 1/1/1
r"^(ve|VirtualEthernet)": "virtual", # ve 10
r"^(lb|loopback)\s?\d": "virtual", # lb 1
r"^(lag|trunk)\s?\d": "lag", # lag 1
r"^management\s?\d": "1000base-t", # management 1
}
DRIVER_TO_PLATFORM = {
@ -107,6 +128,11 @@ DRIVER_TO_PLATFORM = {
"junos": "Juniper Junos",
"nxos": "Cisco NX-OS",
"nxos_ssh": "Cisco NX-OS",
"ruckus_fastiron": "Ruckus FastIron",
"nos": "Brocade NOS",
"brocade_nos": "Brocade NOS",
"brocade_vdx": "Brocade NOS",
"extreme_nos": "Brocade NOS",
}
DRIVER_TO_MANUFACTURER = {
@ -116,6 +142,11 @@ DRIVER_TO_MANUFACTURER = {
"junos": "Juniper",
"nxos": "Cisco",
"nxos_ssh": "Cisco",
"ruckus_fastiron": "Brocade",
"nos": "Brocade",
"brocade_nos": "Brocade",
"brocade_vdx": "Brocade",
"extreme_nos": "Brocade",
}
@ -211,6 +242,8 @@ def normalize_interface_name(name: str) -> str:
for short, long in abbrevs.items():
if name.startswith(short) and not name.startswith(long):
rest = name[len(short):]
# Handle both "Fo1/0/1" and "Fo 1/0/1" (NOS uses spaces)
rest = rest.lstrip()
if rest and (rest[0].isdigit() or rest[0] == "/"):
return long + rest
return name
@ -364,6 +397,594 @@ def collect_pyats_data(host: str, driver: str, username: str, password: str,
return data
# ---------------------------------------------------------------------------
# Netmiko fallback collection (devices without NAPALM drivers)
# ---------------------------------------------------------------------------
def connect_netmiko(host: str, driver: str, username: str, password: str,
secret: str = "", timeout: int = 60) -> ConnectHandler:
"""Connect to a device via Netmiko and return the connection handler."""
device_type = NETMIKO_ONLY_DRIVERS.get(driver, driver)
params = {
"device_type": device_type,
"host": host,
"username": username,
"password": password,
"timeout": timeout,
"conn_timeout": timeout,
}
if secret:
params["secret"] = secret
conn = ConnectHandler(**params)
return conn
def collect_netmiko_data(conn: ConnectHandler, driver: str) -> dict:
"""Collect device data via Netmiko CLI commands and parse output.
Returns a dict shaped like collect_napalm_data() output so the same
entity builders work for both paths.
"""
data = {}
# --- Facts ---
facts = _netmiko_parse_facts(conn, driver)
if not facts:
return data
data["facts"] = facts
log.info(" Facts: %s (%s)", facts.get("hostname"), facts.get("model"))
# --- Interfaces ---
try:
data["interfaces"] = _netmiko_parse_interfaces(conn, driver)
log.debug(" Interfaces: %d", len(data["interfaces"]))
except Exception as exc:
log.warning(" Interface collection failed: %s", exc)
# --- Interface IPs ---
try:
data["interfaces_ip"] = _netmiko_parse_interfaces_ip(conn, driver)
log.debug(" Interface IPs: %d interfaces with IPs", len(data["interfaces_ip"]))
except Exception as exc:
log.warning(" IP collection failed: %s", exc)
# --- LLDP neighbors ---
try:
data["lldp_neighbors"] = _netmiko_parse_lldp(conn, driver)
total = sum(len(v) for v in data["lldp_neighbors"].values())
log.debug(" LLDP neighbors: %d", total)
except Exception as exc:
log.debug(" LLDP collection failed: %s", exc)
# --- Running config ---
try:
running = conn.send_command("show running-config", read_timeout=120)
data["config"] = {
"running": running,
"startup": "",
"candidate": "",
}
log.debug(" Config: running=%d bytes", len(running))
except Exception as exc:
log.warning(" Config collection failed: %s", exc)
return data
def _netmiko_parse_facts(conn: ConnectHandler, driver: str) -> dict:
"""Parse device facts from Netmiko CLI output.
Handles both ICX/FastIron and NOS/VDX output formats.
"""
facts = {
"hostname": "",
"model": "Unknown",
"vendor": "Brocade",
"serial_number": "",
"os_version": "",
"fqdn": "",
"uptime": -1,
"interface_list": [],
}
try:
# Hostname from prompt (works for both ICX and NOS)
# ICX: "SSH@Brocade40G-01#" → "Brocade40G-01"
# NOS: "Brocade-VDX-6940-01#" → "Brocade-VDX-6940-01"
prompt = conn.find_prompt().strip().rstrip("#>")
# ICX prepends "SSH@" to the prompt
if prompt.startswith("SSH@"):
prompt = prompt[4:]
if prompt:
facts["hostname"] = prompt
# 'show version' — works on both ICX and NOS (different formats)
ver_out = conn.send_command("show version")
for line in ver_out.splitlines():
stripped = line.strip()
low = stripped.lower()
# ICX: " HW: Stackable ICX6610-24"
if low.startswith("hw:"):
model = stripped.split(":", 1)[1].strip()
# Remove "Stackable " prefix
model = re.sub(r"^Stackable\s+", "", model)
facts["model"] = model
# ICX: " Serial #: 2ax5o2jk68e"
elif "serial" in low and "#:" in low:
sn = stripped.split("#:", 1)[1].strip()
if sn:
facts["serial_number"] = sn
# ICX: " SW: Version 08.0.30uT7f3"
elif low.startswith("sw:") and "version" in low:
ver = stripped.split("Version", 1)[-1].strip() if "Version" in stripped else stripped.split(":", 1)[1].strip()
facts["os_version"] = ver
# NOS: "Firmware name: ..." or "Network Operating System ..."
elif "firmware name:" in low or "network operating system" in low:
facts["os_version"] = stripped.split(":", 1)[-1].strip() if ":" in stripped else stripped
# NOS: "Firmware version: ..."
elif "firmware version:" in low:
facts["os_version"] = stripped.split(":", 1)[1].strip()
# Try 'show chassis' for NOS (ICX may not have this or it returns different data)
try:
chassis_out = conn.send_command("show chassis")
for line in chassis_out.splitlines():
low = line.strip().lower()
# NOS: "Chassis Name: VDX6940-36Q" or "Switch Type: ..."
if ("chassis name:" in low or "switchtype:" in low
or "switch type:" in low):
val = line.split(":", 1)[1].strip()
if val and val.lower() not in ("", "unknown"):
facts["model"] = val
# NOS: "Chassis Serial Number: ..." or "Serial Number: ..."
elif "serial number:" in low and not facts["serial_number"]:
sn = line.split(":", 1)[1].strip()
if sn:
facts["serial_number"] = sn
except Exception:
pass # ICX may error on 'show chassis'
# Fallback: get model from running config's chassis-name
if facts["model"] in ("Unknown", "") or facts["model"].isdigit():
try:
cfg_out = conn.send_command("show running-config", read_timeout=120)
for line in cfg_out.splitlines():
stripped = line.strip()
if stripped.lower().startswith("chassis-name "):
val = stripped.split(None, 1)[1].strip()
if val:
facts["model"] = val
break
except Exception:
pass
# Try to get hostname from running config (ICX uses 'hostname', NOS uses 'switch-name')
try:
host_out = conn.send_command("show running-config | include hostname|switch-name|host-name")
for line in host_out.splitlines():
stripped = line.strip()
low = stripped.lower()
if (low.startswith("hostname ") or "switch-name" in low
or "host-name" in low):
parts = stripped.split()
if len(parts) >= 2:
facts["hostname"] = parts[-1].strip('"')
except Exception:
pass
# Build interface list from 'show interface brief'
try:
iface_out = conn.send_command("show interface brief")
if "syntax error" not in iface_out.lower():
for line in iface_out.splitlines():
parts = line.split()
if not parts:
continue
# ICX: ports like "1/1/1", "1/2/1", "mgmt1"
# NOS: ports like "Te 0/0", "Gi 0/1", "Ve 10"
if re.match(r"^\d+/\d+/\d+", parts[0]):
facts["interface_list"].append(parts[0])
elif parts[0].startswith(("Te", "Gi", "Fo", "Hu", "Eth", "mgmt",
"Ve", "Lo", "Po", "te", "gi", "fo")):
facts["interface_list"].append(parts[0])
except Exception:
pass
except Exception as exc:
log.error(" Facts parsing failed: %s", exc)
return {}
return facts
def _netmiko_parse_interfaces(conn: ConnectHandler, driver: str) -> dict:
"""Parse interface data from CLI output.
Handles both ICX/FastIron and NOS/VDX formats.
Returns dict matching NAPALM get_interfaces() format:
{iface_name: {is_up, is_enabled, description, mac_address, speed, mtu, last_flapped}}
"""
interfaces = {}
is_icx = driver in ("ruckus_fastiron",)
# --- Try brief output first (ICX only; NOS errors on this) ---
brief_output = ""
try:
brief_output = conn.send_command("show interfaces brief")
if "syntax error" in brief_output.lower() or "invalid" in brief_output.lower():
brief_output = ""
except Exception:
pass
if not brief_output:
try:
brief_output = conn.send_command("show interface brief")
if "syntax error" in brief_output.lower() or "invalid" in brief_output.lower():
brief_output = ""
except Exception:
pass
if brief_output and is_icx:
for line in brief_output.splitlines():
parts = line.split()
if not parts:
continue
# ICX format:
# Port Link State Dupl Speed Trunk Tag Pvid Pri MAC Name
# 1/1/1 Up Forward Full 1G None No 20 0 cc4e.2413.2562
if not re.match(r"^\d+/\d+/\d+|^mgmt\d*|^lb\d*|^ve\d*", parts[0]):
continue
name = parts[0]
is_up = len(parts) > 1 and parts[1].lower() == "up"
speed = 0
mac = ""
for part in parts:
# Speed: "1G", "10G", "40G", "100G", "100M"
speed_m = re.match(r"^(\d+)([GM])$", part)
if speed_m:
val = int(speed_m.group(1))
speed = val * 1000 if speed_m.group(2) == "G" else val # → Mbps
# MAC: cc4e.2413.2562
if re.match(r"^[0-9a-f]{4}\.[0-9a-f]{4}\.[0-9a-f]{4}$", part, re.IGNORECASE):
mac = part
interfaces[name] = {
"is_up": is_up,
"is_enabled": True,
"description": "",
"mac_address": mac,
"speed": speed,
"mtu": 0,
"last_flapped": -1.0,
}
# --- Parse full 'show interface' detail output ---
# For NOS/VDX this is the primary source (brief doesn't exist).
# For ICX this enriches the brief data with descriptions, MACs, MTUs.
try:
detail_out = conn.send_command("show interface", read_timeout=120)
current_iface = None
for line in detail_out.splitlines():
# Header line pattern (both ICX and NOS):
# NOS: "FortyGigabitEthernet 1/0/1 is up, line protocol is up (connected)"
# NOS: "TenGigabitEthernet 1/0/34:1 is up, line protocol is up"
# NOS: "Management 1/0 is up, line protocol is up"
# ICX: "GigabitEthernet1/1/1 is up, line protocol is up"
iface_hdr = re.match(
r"^((?:TenGigabit|Gigabit|FortyGigabit|HundredGigabit)?Ethernet|"
r"Management|Loopback|Port-channel|Ve)\s*(\S+)\s+is\s+(up|down|administratively down)",
line,
)
if iface_hdr:
iface_type_str = iface_hdr.group(1)
iface_id = iface_hdr.group(2)
link_state = iface_hdr.group(3)
full_name = f"{iface_type_str} {iface_id}"
# Abbreviate NOS long names for consistency with LLDP
abbrev_map = {
"FortyGigabitEthernet": "Fo",
"TenGigabitEthernet": "Te",
"GigabitEthernet": "Gi",
"HundredGigabitEthernet": "Hu",
"Management": "Management",
"Loopback": "Lo",
"Port-channel": "Po",
"Ve": "Ve",
}
abbrev = abbrev_map.get(iface_type_str, iface_type_str)
short_name = f"{abbrev} {iface_id}"
if is_icx:
# For ICX, match to existing brief entry
current_iface = None
for iname in interfaces:
if full_name.endswith(iname) or iname == full_name:
current_iface = iname
break
else:
# NOS: create new interface entry using abbreviated name
current_iface = short_name
is_up = link_state == "up"
# Check "line protocol is up/down" portion
proto_up = "line protocol is up" in line.lower()
if current_iface not in interfaces:
interfaces[current_iface] = {
"is_up": proto_up,
"is_enabled": link_state != "administratively down",
"description": "",
"mac_address": "",
"speed": 0,
"mtu": 0,
"last_flapped": -1.0,
}
continue
if current_iface and current_iface in interfaces:
stripped = line.strip()
low = stripped.lower()
if "port name is" in low or low.startswith("description:"):
if low.startswith("description:"):
desc = stripped.split(":", 1)[-1].strip()
else:
desc = stripped.split("is", 1)[-1].strip()
interfaces[current_iface]["description"] = desc[:200]
elif "hardware is" in low and "address is" in low:
mac_match = re.search(r"address is\s+([0-9a-fA-F.:]+)", stripped)
if mac_match:
interfaces[current_iface]["mac_address"] = mac_match.group(1)
elif low.startswith("mtu") or "mtu " in low:
mtu_match = re.search(r"mtu\s+(\d+)", stripped, re.IGNORECASE)
if mtu_match:
interfaces[current_iface]["mtu"] = int(mtu_match.group(1))
elif "linespeed actual" in low:
# NOS: "LineSpeed Actual : 40000 Mbit"
speed_match = re.search(r":\s*(\d+)\s*Mbit", stripped, re.IGNORECASE)
if speed_match:
interfaces[current_iface]["speed"] = int(speed_match.group(1))
except Exception as exc:
log.debug(" Detailed interface parsing failed: %s", exc)
return interfaces
def _netmiko_parse_interfaces_ip(conn: ConnectHandler, driver: str) -> dict:
"""Parse IP addresses from CLI output.
Handles both ICX and NOS formats.
Returns dict matching NAPALM get_interfaces_ip() format:
{iface_name: {ipv4: {addr: {prefix_length: N}}, ipv6: {addr: {prefix_length: N}}}}
"""
interfaces_ip: dict[str, dict] = {}
# 'show ip interface' — works on both ICX and NOS
# ICX: "Ve 1 192.168.1.250 YES NVRAM up up default-vrf"
# NOS: "Interface IP-Address Status Protocol"
try:
output = conn.send_command("show ip interface")
for line in output.splitlines():
# Match lines with an IP address
ip_match = re.search(r"(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?", line)
if not ip_match:
continue
# Skip header lines and lines that look like headers
if "IP-Address" in line or "Interface" in line.split()[0:1]:
continue
ip = ip_match.group(1)
if ip.startswith("0.") or ip == "127.0.0.1":
continue
prefix = int(ip_match.group(2)) if ip_match.group(2) else None
# Extract interface name — everything before the IP
before_ip = line[:ip_match.start()].strip()
# Interface names may have spaces: "Ve 1", "Ve 20", "mgmt 1"
iface_name = before_ip.strip()
if not iface_name:
continue
# If prefix not in output, try to get it from running config later
if prefix is None:
prefix = 24 # default fallback, will try to refine below
interfaces_ip.setdefault(iface_name, {"ipv4": {}, "ipv6": {}})
interfaces_ip[iface_name]["ipv4"][ip] = {"prefix_length": prefix}
except Exception as exc:
log.debug(" 'show ip interface' failed: %s", exc)
# Also try running-config to get accurate prefix lengths.
# NOS has nested interfaces under "rbridge-id 1" so pipe-filtered
# includes won't catch them. Parse the full running config instead.
try:
output = conn.send_command("show running-config", read_timeout=120)
current_iface = None
for line in output.splitlines():
stripped = line.strip()
# Match "interface Ve 40", "interface Management 1/0", etc.
iface_match = re.match(r"^interface\s+(\S+(?:\s+\S+)?)", stripped, re.IGNORECASE)
if iface_match:
current_iface = iface_match.group(1).strip()
continue
# Reset on non-indented lines that aren't "interface"
if not line.startswith(" ") and not line.startswith("\t") and stripped and not stripped.startswith("!"):
if not stripped.lower().startswith("interface"):
current_iface = None
continue
if current_iface and "ip address" in stripped.lower():
ip_match = re.search(
r"ip address\s+(\d+\.\d+\.\d+\.\d+)[/\s]+(\d+\.[\d.]+|\d+)", stripped,
re.IGNORECASE,
)
if ip_match:
ip = ip_match.group(1)
mask_or_prefix = ip_match.group(2)
if "." in mask_or_prefix:
prefix = sum(bin(int(x)).count("1") for x in mask_or_prefix.split("."))
else:
prefix = int(mask_or_prefix)
# Normalize interface name: "Ve 40" vs "ve 40"
normalized = current_iface
for existing_name in interfaces_ip:
if existing_name.lower().replace(" ", "") == normalized.lower().replace(" ", ""):
normalized = existing_name
break
interfaces_ip.setdefault(normalized, {"ipv4": {}, "ipv6": {}})
interfaces_ip[normalized]["ipv4"][ip] = {"prefix_length": prefix}
except Exception as exc:
log.debug(" Running-config IP parsing failed: %s", exc)
# IPv6
try:
output = conn.send_command("show ipv6 interface brief")
current_iface = None
for line in output.splitlines():
parts = line.split()
if not parts:
continue
# Interface line (not indented, not an address)
if not line.startswith(" ") and not re.match(r"^[0-9a-f]{4}:", parts[0], re.IGNORECASE):
iface_match = re.match(r"^(\S+(?:\s+\d+)?)", line)
if iface_match:
current_iface = iface_match.group(1).strip()
# IPv6 address line
ipv6_match = re.search(r"([0-9a-fA-F:]+(?:::[0-9a-fA-F:]*)?)/(\d+)", line)
if ipv6_match and current_iface:
ipv6 = ipv6_match.group(1)
prefix = int(ipv6_match.group(2))
if not ipv6.lower().startswith("fe80"):
interfaces_ip.setdefault(current_iface, {"ipv4": {}, "ipv6": {}})
interfaces_ip[current_iface]["ipv6"][ipv6] = {"prefix_length": prefix}
except Exception as exc:
log.debug(" IPv6 interface parsing failed: %s", exc)
return interfaces_ip
def _netmiko_parse_lldp(conn: ConnectHandler, driver: str) -> dict:
"""Parse LLDP neighbor data from CLI output.
Handles both ICX and NOS LLDP output formats.
Returns dict matching NAPALM get_lldp_neighbors_detail() format:
{local_iface: [{"parent_interface": "", "remote_system_name": "...", ...}]}
"""
lldp: dict[str, list] = {}
try:
output = conn.send_command("show lldp neighbors detail")
except Exception:
try:
output = conn.send_command("show lldp neighbors")
except Exception:
return lldp
if not output:
return lldp
# Both ICX and NOS use similar LLDP detail format:
# ICX: "Local port: 1/1/1" / NOS: "Local Interface: TenGigabitEthernet 0/0"
# Both: "+ Chassis ID (MAC address): xxxx.xxxx.xxxx"
# Both: "+ Port ID (interface name): ..."
# Both: "+ System name : ..."
# Entries separated by "Local port:" / "Local Interface:" headers
current_local = ""
entry: dict[str, str] = {}
def _flush_entry():
nonlocal entry
if entry and current_local:
lldp.setdefault(current_local, []).append({
"parent_interface": current_local,
"remote_system_name": entry.get("system_name", ""),
"remote_port": entry.get("port_id", "").strip('"'),
"remote_port_description": entry.get("port_description", "").strip('"'),
"remote_chassis_id": entry.get("chassis_id", ""),
"remote_system_description": entry.get("system_description", ""),
"remote_system_capab": [],
"remote_system_enable_capab": [],
})
entry = {}
for line in output.splitlines():
stripped = line.strip()
low = stripped.lower()
# New neighbor block — ICX: "Local port: 1/1/1"
# NOS: "Local Interface: Fo 1/0/1 (Local Interface MAC: ...)"
# NOS: "Neighbors for Interface Fo 1/0/1" (section header)
if low.startswith("neighbors for interface"):
_flush_entry()
current_local = stripped.split("Interface", 1)[1].strip()
continue
if "local port:" in low or "local interface:" in low:
_flush_entry()
raw = stripped.split(":", 1)[1].strip()
# NOS appends "(Local Interface MAC: xxxx.xxxx.xxxx)" — strip it
paren_idx = raw.find("(")
if paren_idx > 0:
raw = raw[:paren_idx].strip()
current_local = raw
continue
if not stripped:
continue
# Skip section headers
if stripped in ("MANDATORY TLVs", "OPTIONAL TLVs", "DCBX TLVs") or stripped.startswith("==="):
continue
# ICX uses "+ Field : value" format; NOS uses "Field: value"
# Normalize: strip leading "+" and whitespace
clean = stripped.lstrip("+ ").strip()
clean_low = clean.lower()
if clean_low.startswith("chassis id"):
# "+ Chassis ID (MAC address): 0013.c602.6961"
val = clean.split(":", 1)[-1].strip() if ":" in clean else ""
if val:
entry["chassis_id"] = val
elif clean_low.startswith("port id"):
val = clean.split(":", 1)[-1].strip() if ":" in clean else ""
if val:
entry["port_id"] = val
elif clean_low.startswith("remote interface"):
# NOS: "Remote Interface: 748e.f8e9.d41b (Remote Interface MAC: ...)"
# The value is a MAC or port name
raw = clean.split(":", 1)[-1].strip() if ":" in clean else ""
paren_idx = raw.find("(")
if paren_idx > 0:
raw = raw[:paren_idx].strip()
if raw and "remote_port_id" not in entry:
entry["remote_port_id"] = raw
elif clean_low.startswith("port interface description"):
# NOS: "Port Interface Description: 40GigabitEthernet1/2/1"
val = clean.split(":", 1)[-1].strip() if ":" in clean else ""
if val:
entry["port_description"] = val.strip('"')
# On NOS, this is often the best port identifier
if not entry.get("port_id"):
entry["port_id"] = val.strip('"')
elif clean_low.startswith("system name"):
val = clean.split(":", 1)[-1].strip() if ":" in clean else ""
if val:
entry["system_name"] = val.strip('"')
elif clean_low.startswith("system description"):
val = clean.split(":", 1)[-1].strip() if ":" in clean else ""
if val:
entry["system_description"] = val.strip('"')
elif clean_low.startswith("port description"):
val = clean.split(":", 1)[-1].strip() if ":" in clean else ""
if val:
entry["port_description"] = val.strip('"')
# Flush final entry
_flush_entry()
return lldp
# ---------------------------------------------------------------------------
# Device reference helper (rich references for Diode reconciler)
# ---------------------------------------------------------------------------
@ -425,17 +1046,20 @@ def build_interface_entities(interfaces: dict, hostname: str, model: str,
speed = iface_data.get("speed", 0)
iface_type = map_interface_type(name, speed)
iface = Interface(
mac = iface_data.get("mac_address") or ""
iface_kwargs = dict(
device=dev_ref,
name=name,
type=iface_type,
enabled=iface_data.get("is_enabled", True),
mac_address=iface_data.get("mac_address") or "",
mtu=iface_data.get("mtu") or 0,
speed=speed * 1000 if speed else 0, # NAPALM Mbps → NetBox Kbps
description=iface_data.get("description") or "",
tags=["network-collector"],
)
if mac:
iface_kwargs["primary_mac_address"] = mac
iface = Interface(**iface_kwargs)
entities.append(Entity(interface=iface))
return entities
@ -823,27 +1447,45 @@ def collect_all_entities(inventory: dict, env_file: str = ".env") -> tuple[list[
log.info("Connecting to %s (driver=%s, role=%s)...", host, driver, role)
# --- NAPALM collection ---
try:
dev = connect_device(host, driver, username, password, secret,
timeout, optional_args)
except Exception as exc:
log.error("Failed to connect to %s: %s", host, exc)
continue
# --- Decide collection path: NAPALM or Netmiko fallback ---
use_netmiko = driver in NETMIKO_ONLY_DRIVERS
try:
napalm_data = collect_napalm_data(dev)
finally:
if use_netmiko:
log.info(" Using Netmiko fallback (no NAPALM driver for %s)", driver)
try:
dev.close()
except Exception:
pass
conn = connect_netmiko(host, driver, username, password, secret, timeout)
except Exception as exc:
log.error("Failed to connect to %s via Netmiko: %s", host, exc)
continue
try:
collected_data = collect_netmiko_data(conn, driver)
finally:
try:
conn.disconnect()
except Exception:
pass
else:
# --- NAPALM collection ---
try:
dev = connect_device(host, driver, username, password, secret,
timeout, optional_args)
except Exception as exc:
log.error("Failed to connect to %s: %s", host, exc)
continue
if not napalm_data.get("facts"):
try:
collected_data = collect_napalm_data(dev)
finally:
try:
dev.close()
except Exception:
pass
if not collected_data.get("facts"):
log.error("No facts for %s, skipping", host)
continue
facts = napalm_data["facts"]
facts = collected_data["facts"]
hostname = facts.get("hostname") or host
model = facts.get("model") or "Unknown"
vendor = facts.get("vendor") or DRIVER_TO_MANUFACTURER.get(driver, "Unknown")
@ -855,47 +1497,47 @@ def collect_all_entities(inventory: dict, env_file: str = ".env") -> tuple[list[
entities.append(build_device_entity(facts, driver, role, site_name, host))
# Interface entities
if napalm_data.get("interfaces"):
if collected_data.get("interfaces"):
entities.extend(build_interface_entities(
napalm_data["interfaces"], hostname, model, vendor, role, site_name
collected_data["interfaces"], hostname, model, vendor, role, site_name
))
# IP entities (IPv4 + IPv6)
if napalm_data.get("interfaces_ip"):
if collected_data.get("interfaces_ip"):
entities.extend(build_ip_entities(
napalm_data["interfaces_ip"], hostname, model, vendor, role, site_name
collected_data["interfaces_ip"], hostname, model, vendor, role, site_name
))
# Prefix entities from discovered IPs
entities.extend(build_prefix_entities(
napalm_data["interfaces_ip"], site_name
collected_data["interfaces_ip"], site_name
))
# VLAN entities
if napalm_data.get("vlans"):
entities.extend(build_vlan_entities(napalm_data["vlans"], site_name))
if collected_data.get("vlans"):
entities.extend(build_vlan_entities(collected_data["vlans"], site_name))
# VRF entities
if napalm_data.get("network_instances"):
entities.extend(build_vrf_entities(napalm_data["network_instances"]))
if collected_data.get("network_instances"):
entities.extend(build_vrf_entities(collected_data["network_instances"]))
# Config entity
if napalm_data.get("config"):
if collected_data.get("config"):
config_entity = build_config_entity(
napalm_data["config"], hostname, model, vendor, role, site_name
collected_data["config"], hostname, model, vendor, role, site_name
)
if config_entity:
entities.append(config_entity)
# LLDP neighbors (saved for cable building later)
if napalm_data.get("lldp_neighbors"):
lldp_all[hostname] = napalm_data["lldp_neighbors"]
if collected_data.get("lldp_neighbors"):
lldp_all[hostname] = collected_data["lldp_neighbors"]
# BGP data (saved for plugin API push later)
if napalm_data.get("bgp_neighbors"):
bgp_all[hostname] = napalm_data["bgp_neighbors"]
if collected_data.get("bgp_neighbors"):
bgp_all[hostname] = collected_data["bgp_neighbors"]
# --- pyATS collection (optional) ---
if HAS_PYATS and driver in ("ios", "iosxr", "nxos", "nxos_ssh"):
# --- pyATS collection (optional, NAPALM devices only) ---
if not use_netmiko and HAS_PYATS and driver in ("ios", "iosxr", "nxos", "nxos_ssh"):
log.info(" Running pyATS parsers...")
pyats_data = collect_pyats_data(host, driver, username, password, secret)

464
collectors/pbs_collector.py Normal file
View File

@ -0,0 +1,464 @@
#!/usr/bin/env python3
"""Proxmox Backup Server collector for NetBox via Diode SDK.
Discovers PBS hosts, network interfaces, IPs, and datastores
and ingests them into NetBox through the Diode pipeline.
Usage:
python collectors/pbs_collector.py --dry-run
python collectors/pbs_collector.py
"""
import argparse
import logging
import os
import sys
from proxmoxer import ProxmoxAPI
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Service,
Site,
)
log = logging.getLogger("pbs-collector")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PBS_MANUFACTURER = "Proxmox Server Solutions GmbH"
PBS_MODEL = "Proxmox Backup Server"
IFACE_TYPE_MAP = {
"eth": "1000base-t",
"bond": "lag",
"bridge": "bridge",
"vlan": "virtual",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_diode_config() -> dict:
cfg = {
"diode_target": os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode"),
"client_id": os.getenv("INGESTER_CLIENT_ID", os.getenv("DIODE_CLIENT_ID", "diode-ingester")),
"client_secret": os.getenv("INGESTER_CLIENT_SECRET", os.getenv("DIODE_CLIENT_SECRET")),
}
if not cfg["client_secret"]:
log.error("Missing required env var: INGESTER_CLIENT_SECRET or DIODE_CLIENT_SECRET")
sys.exit(1)
return cfg
def get_pbs_hosts() -> list[dict]:
"""Build list of PBS host configs from numbered env vars.
Supports PBS_HOST_1/PBS_USER_1/... through PBS_HOST_N.
Falls back to legacy single PBS_HOST if no numbered vars exist.
"""
hosts = []
misses = 0
for i in range(1, 100):
host = os.getenv(f"PBS_HOST_{i}")
if host is None:
misses += 1
if misses >= 3:
break
continue
misses = 0
hosts.append({
"pbs_host": host,
"pbs_user": os.getenv(f"PBS_USER_{i}", os.getenv("PBS_USER", "root@pam")),
"pbs_token_name": os.getenv(f"PBS_TOKEN_NAME_{i}", os.getenv("PBS_TOKEN_NAME")),
"pbs_token_value": os.getenv(f"PBS_TOKEN_VALUE_{i}", os.getenv("PBS_TOKEN_VALUE")),
"pbs_verify_ssl": os.getenv(
f"PBS_VERIFY_SSL_{i}", os.getenv("PBS_VERIFY_SSL", "false")
).lower() in ("true", "1", "yes"),
"pbs_port": int(os.getenv(f"PBS_PORT_{i}", os.getenv("PBS_PORT", "8007"))),
"site_name": os.getenv(f"PBS_SITE_{i}", os.getenv("SITE_NAME", "main")),
})
# Backward compat with single PBS_HOST
legacy_host = os.getenv("PBS_HOST")
if legacy_host:
already_listed = any(h["pbs_host"] == legacy_host for h in hosts)
if not already_listed:
hosts.insert(0, {
"pbs_host": legacy_host,
"pbs_user": os.getenv("PBS_USER", "root@pam"),
"pbs_token_name": os.getenv("PBS_TOKEN_NAME"),
"pbs_token_value": os.getenv("PBS_TOKEN_VALUE"),
"pbs_verify_ssl": os.getenv("PBS_VERIFY_SSL", "false").lower() in ("true", "1", "yes"),
"pbs_port": int(os.getenv("PBS_PORT", "8007")),
"site_name": os.getenv("SITE_NAME", "main"),
})
if not hosts:
log.error("No PBS hosts configured (set PBS_HOST or PBS_HOST_1)")
sys.exit(1)
for i, h in enumerate(hosts):
missing = [k for k in ("pbs_host", "pbs_token_name", "pbs_token_value") if not h.get(k)]
if missing:
log.error("PBS host %d (%s): missing %s", i + 1, h.get("pbs_host", "?"), ", ".join(missing))
sys.exit(1)
return hosts
# ---------------------------------------------------------------------------
# PBS connection
# ---------------------------------------------------------------------------
def connect_pbs(config: dict) -> ProxmoxAPI:
"""Create and return a ProxmoxAPI connection to PBS."""
return ProxmoxAPI(
config["pbs_host"],
service="PBS",
port=config["pbs_port"],
user=config["pbs_user"],
token_name=config["pbs_token_name"],
token_value=config["pbs_token_value"],
verify_ssl=config["pbs_verify_ssl"],
backend="https",
)
# ---------------------------------------------------------------------------
# Data collection (pure PBS API calls)
# ---------------------------------------------------------------------------
def collect_node_status(pbs: ProxmoxAPI) -> dict:
return pbs.nodes("localhost").status.get()
def collect_node_networks(pbs: ProxmoxAPI) -> list[dict]:
return pbs.nodes("localhost").network.get()
def collect_datastores(pbs: ProxmoxAPI) -> list[dict]:
return pbs.admin.datastore.get()
def collect_datastore_usage(pbs: ProxmoxAPI) -> list[dict]:
try:
return pbs.status("datastore-usage").get()
except Exception as exc:
log.debug("Datastore usage endpoint unavailable: %s", exc)
return []
def collect_datastore_snapshots(pbs: ProxmoxAPI, store: str) -> list[dict]:
try:
return pbs.admin.datastore(store).snapshots.get()
except Exception as exc:
log.debug("Cannot get snapshots for %s: %s", store, exc)
return []
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _device_ref(hostname: str, site_name: str) -> Device:
return Device(
name=hostname,
device_type=DeviceType(
model=PBS_MODEL,
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
role=DeviceRole(name="Backup Server"),
site=Site(name=site_name),
)
def _map_interface_type(pve_type: str, iface_name: str) -> str:
if pve_type in IFACE_TYPE_MAP:
return IFACE_TYPE_MAP[pve_type]
if iface_name.startswith(("eno", "enp", "ens", "eth")):
return "1000base-t"
if iface_name.startswith("vmbr"):
return "bridge"
return "other"
def _netmask_to_prefix(mask: str) -> int:
try:
return sum(bin(int(o)).count("1") for o in mask.split("."))
except (ValueError, AttributeError):
return 24
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_device_entity(hostname: str, node_status: dict, site_name: str) -> Entity:
cpuinfo = node_status.get("cpuinfo", {})
memory = node_status.get("memory", {})
mem_gb = memory.get("total", 0) / (1024 ** 3)
info = node_status.get("info", {})
pbs_version = info.get("version", "")
kernel = node_status.get("kversion", "")
return Entity(device=Device(
name=hostname,
device_type=DeviceType(
model=PBS_MODEL,
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
role=DeviceRole(name="Backup Server"),
platform=Platform(
name="Proxmox Backup Server",
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
site=Site(name=site_name),
status="active",
description=f"PBS {pbs_version}, kernel {kernel}" if pbs_version else "",
comments=f"CPU: {cpuinfo.get('model', 'N/A')}, "
f"Cores: {cpuinfo.get('cores', '?')}, "
f"Memory: {mem_gb:.1f} GB",
tags=["proxmox", "pbs"],
))
def build_interface_entities(hostname: str, interfaces: list[dict],
site_name: str) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
pve_type = iface.get("type", "")
desc_parts = []
if iface.get("bridge_ports"):
desc_parts.append(f"bridge_ports: {iface['bridge_ports']}")
if iface.get("comments"):
desc_parts.append(iface["comments"])
entities.append(Entity(interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
enabled=bool(iface.get("active", 0)),
mtu=iface.get("mtu"),
description=", ".join(desc_parts)[:200] if desc_parts else "",
tags=["proxmox", "pbs"],
)))
return entities
def build_ip_entities(hostname: str, interfaces: list[dict],
site_name: str) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
cidr = iface.get("cidr")
address = iface.get("address")
netmask = iface.get("netmask")
if cidr:
ip_str = cidr
elif address and netmask:
ip_str = f"{address}/{_netmask_to_prefix(netmask)}"
elif address:
ip_str = f"{address}/24"
else:
continue
pve_type = iface.get("type", "")
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
),
tags=["proxmox", "pbs"],
)))
# IPv6
cidr6 = iface.get("cidr6")
if cidr6:
entities.append(Entity(ip_address=IPAddress(
address=cidr6,
status="active",
assigned_object_interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
),
tags=["proxmox", "pbs"],
)))
return entities
def build_datastore_entities(hostname: str, datastores: list[dict],
usage_map: dict, site_name: str,
pbs: ProxmoxAPI) -> list[Entity]:
entities = []
for ds in datastores:
store_name = ds.get("name", ds.get("store", "unknown"))
usage = usage_map.get(store_name, {})
total = usage.get("total", 0)
used = usage.get("used", 0)
total_gb = total / (1024 ** 3) if total else 0
used_gb = used / (1024 ** 3) if used else 0
pct = (used / total * 100) if total else 0
path = ds.get("path", "N/A")
gc_schedule = ds.get("gc-schedule", "N/A")
# Count snapshots
try:
snapshots = collect_datastore_snapshots(pbs, store_name)
snap_count = len(snapshots)
except Exception:
snap_count = 0
log.info(" Datastore '%s': %.1f GB total, %.1f GB used (%.0f%%), %d snapshots",
store_name, total_gb, used_gb, pct, snap_count)
entities.append(Entity(service=Service(
device=_device_ref(hostname, site_name),
name=f"ds-{store_name}",
protocol="tcp",
ports=[8007],
description=f"PBS Datastore: {total_gb:.1f} GB total, {used_gb:.1f} GB used ({pct:.0f}%)",
comments=f"Path: {path}\nSnapshots: {snap_count}\nGC schedule: {gc_schedule}",
tags=["proxmox", "pbs", "datastore"],
)))
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(pbs: ProxmoxAPI, host_cfg: dict) -> list[Entity]:
site = host_cfg["site_name"]
entities: list[Entity] = []
node_status = collect_node_status(pbs)
hostname = node_status.get("hostname", host_cfg["pbs_host"])
log.info("PBS host: %s", hostname)
# Device
entities.append(build_device_entity(hostname, node_status, site))
# Interfaces and IPs
networks = collect_node_networks(pbs)
entities.extend(build_interface_entities(hostname, networks, site))
entities.extend(build_ip_entities(hostname, networks, site))
# Datastores as Services
datastores = collect_datastores(pbs)
log.info(" Datastores: %d", len(datastores))
usage_list = collect_datastore_usage(pbs)
usage_map = {u.get("store", u.get("name", "")): u for u in usage_list}
entities.extend(build_datastore_entities(hostname, datastores, usage_map, site, pbs))
return entities
def ingest_entities(entities: list[Entity], config: dict, dry_run: bool = False) -> None:
if dry_run:
client = DiodeDryRunClient(app_name="pbs-collector")
log.info("Dry-run mode: writing entities to stdout")
client.ingest(entities=entities)
return
with DiodeClient(
target=config["diode_target"],
client_id=config["client_id"],
client_secret=config["client_secret"],
app_name="pbs-collector",
app_version="0.1.0",
) as client:
log.info("Ingesting %d entities to %s ...", len(entities), config["diode_target"])
response = client.ingest(entities=entities)
if response.errors:
log.error("Ingestion errors: %s", response.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="Proxmox Backup Server collector for NetBox")
parser.add_argument("--dry-run", action="store_true", help="Output entities without ingesting")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env", help="Path to .env file")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(levelname)-8s %(message)s",
)
load_dotenv(args.env_file)
diode_config = get_diode_config()
pbs_hosts = get_pbs_hosts()
all_entities: list[Entity] = []
for i, host_cfg in enumerate(pbs_hosts, 1):
log.info("=== PBS Host %d/%d: %s ===", i, len(pbs_hosts), host_cfg["pbs_host"])
try:
pbs = connect_pbs(host_cfg)
entities = collect_all_entities(pbs, host_cfg)
log.info("Collected %d entities from %s", len(entities), host_cfg["pbs_host"])
all_entities.extend(entities)
except Exception:
log.exception("Failed to collect from PBS host %s", host_cfg["pbs_host"])
log.info("Total: %d entities from %d PBS host(s)", len(all_entities), len(pbs_hosts))
if not all_entities:
log.warning("No entities collected. Exiting.")
return
ingest_entities(all_entities, diode_config, dry_run=args.dry_run)
log.info("Done.")
if __name__ == "__main__":
main()

View File

@ -108,27 +108,78 @@ def load_dotenv(path: str = ".env") -> None:
os.environ.setdefault(key, value)
def get_config() -> dict:
"""Read and validate configuration from environment variables."""
def get_diode_config() -> dict:
"""Read Diode connection config from environment variables."""
cfg = {
"pve_host": os.getenv("PVE_HOST"),
"pve_user": os.getenv("PVE_USER", "root@pam"),
"pve_token_name": os.getenv("PVE_TOKEN_NAME"),
"pve_token_value": os.getenv("PVE_TOKEN_VALUE"),
"pve_verify_ssl": os.getenv("PVE_VERIFY_SSL", "false").lower() in ("true", "1", "yes"),
"pve_port": int(os.getenv("PVE_PORT", "8006")),
"diode_target": os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode"),
"client_id": os.getenv("INGESTER_CLIENT_ID", os.getenv("DIODE_CLIENT_ID", "diode-ingester")),
"client_secret": os.getenv("INGESTER_CLIENT_SECRET", os.getenv("DIODE_CLIENT_SECRET")),
"site_name": os.getenv("SITE_NAME", "main"),
}
missing = [k for k in ("pve_host", "pve_token_name", "pve_token_value", "client_secret") if not cfg[k]]
if missing:
log.error("Missing required env vars: %s", ", ".join(missing))
if not cfg["client_secret"]:
log.error("Missing required env var: INGESTER_CLIENT_SECRET or DIODE_CLIENT_SECRET")
sys.exit(1)
return cfg
def get_pve_hosts() -> list[dict]:
"""Build list of PVE host configs from numbered env vars.
Supports PVE_HOST_1/PVE_USER_1/... through PVE_HOST_N.
Falls back to legacy single PVE_HOST if no numbered vars exist.
"""
hosts = []
# Scan numbered PVE_HOST_1 through PVE_HOST_N (stop after 3 consecutive misses)
misses = 0
for i in range(1, 100):
host = os.getenv(f"PVE_HOST_{i}")
if host is None:
misses += 1
if misses >= 3:
break
continue
misses = 0
hosts.append({
"pve_host": host,
"pve_user": os.getenv(f"PVE_USER_{i}", os.getenv("PVE_USER", "root@pam")),
"pve_token_name": os.getenv(f"PVE_TOKEN_NAME_{i}", os.getenv("PVE_TOKEN_NAME")),
"pve_token_value": os.getenv(f"PVE_TOKEN_VALUE_{i}", os.getenv("PVE_TOKEN_VALUE")),
"pve_verify_ssl": os.getenv(
f"PVE_VERIFY_SSL_{i}", os.getenv("PVE_VERIFY_SSL", "false")
).lower() in ("true", "1", "yes"),
"pve_port": int(os.getenv(f"PVE_PORT_{i}", os.getenv("PVE_PORT", "8006"))),
"site_name": os.getenv(f"PVE_SITE_{i}", os.getenv("SITE_NAME", "main")),
})
# Also include legacy PVE_HOST if it exists and isn't already in the list
legacy_host = os.getenv("PVE_HOST")
if legacy_host:
already_listed = any(h["pve_host"] == legacy_host for h in hosts)
if not already_listed:
hosts.insert(0, {
"pve_host": legacy_host,
"pve_user": os.getenv("PVE_USER", "root@pam"),
"pve_token_name": os.getenv("PVE_TOKEN_NAME"),
"pve_token_value": os.getenv("PVE_TOKEN_VALUE"),
"pve_verify_ssl": os.getenv("PVE_VERIFY_SSL", "false").lower() in ("true", "1", "yes"),
"pve_port": int(os.getenv("PVE_PORT", "8006")),
"site_name": os.getenv("SITE_NAME", "main"),
})
if not hosts:
log.error("No PVE hosts configured (set PVE_HOST or PVE_HOST_1)")
sys.exit(1)
# Validate each host
for i, h in enumerate(hosts):
missing = [k for k in ("pve_host", "pve_token_name", "pve_token_value") if not h.get(k)]
if missing:
log.error("PVE host %d (%s): missing %s", i + 1, h.get("pve_host", "?"), ", ".join(missing))
sys.exit(1)
return hosts
# ---------------------------------------------------------------------------
# PVE connection
# ---------------------------------------------------------------------------
@ -375,7 +426,7 @@ def build_node_interface_entities(
name=name,
type=map_pve_interface_type(pve_type, name),
enabled=bool(iface.get("active", 0)),
mtu=iface.get("mtu"),
mtu=int(iface["mtu"]) if iface.get("mtu") else None,
description=_iface_description(iface),
tags=["proxmox"],
)))
@ -763,20 +814,28 @@ def main():
)
load_dotenv(args.env_file)
config = get_config()
diode_config = get_diode_config()
pve_hosts = get_pve_hosts()
log.info("Connecting to PVE at %s:%s ...", config["pve_host"], config["pve_port"])
prox = connect_pve(config)
all_entities: list[Entity] = []
log.info("Collecting entities from Proxmox VE...")
entities = collect_all_entities(prox, config)
log.info("Collected %d entities total", len(entities))
for i, host_cfg in enumerate(pve_hosts, 1):
log.info("=== PVE Host %d/%d: %s ===", i, len(pve_hosts), host_cfg["pve_host"])
try:
prox = connect_pve(host_cfg)
entities = collect_all_entities(prox, host_cfg)
log.info("Collected %d entities from %s", len(entities), host_cfg["pve_host"])
all_entities.extend(entities)
except Exception:
log.exception("Failed to collect from PVE host %s", host_cfg["pve_host"])
if not entities:
log.info("Total: %d entities from %d host(s)", len(all_entities), len(pve_hosts))
if not all_entities:
log.warning("No entities collected. Exiting.")
return
ingest_entities(entities, config, dry_run=args.dry_run)
ingest_entities(all_entities, diode_config, dry_run=args.dry_run)
log.info("Done.")

View File

@ -0,0 +1,758 @@
#!/usr/bin/env python3
"""UniFi collector for NetBox via Diode SDK.
Discovers Ubiquiti UniFi devices (UDM, switches, APs), their ports, radios,
VLANs, WLANs, and LLDP neighbors from the local UniFi Controller API and
ingests them into NetBox.
Supports:
- UDM Pro / UDM-SE / UDR / Cloud Key Gen2+ (UniFi OS)
- Legacy UniFi Controller (standalone)
Usage:
python collectors/unifi_collector.py --dry-run
python collectors/unifi_collector.py
"""
import argparse
import logging
import os
import sys
from unifi_controller_api import UnifiController
from netboxlabs.diode.sdk import DiodeClient
from netboxlabs.diode.sdk.ingester import (
Cable,
CableTermination,
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
GenericObject,
Interface,
IPAddress,
Manufacturer,
Platform,
Prefix,
Site,
VLAN,
VLANGroup,
WirelessLAN,
WirelessLANGroup,
)
log = logging.getLogger("unifi-collector")
# ---------------------------------------------------------------------------
# UniFi → NetBox mappings
# ---------------------------------------------------------------------------
# UniFi device type strings → NetBox role
DEVICE_TYPE_TO_ROLE = {
"ugw": "Firewall", # UniFi Gateway (USG, UDM, UDM-SE, etc.)
"usw": "Switch", # UniFi Switch
"uap": "Access Point", # UniFi AP
"uxg": "Firewall", # UniFi Next-Gen Gateway
"udm": "Firewall", # UniFi Dream Machine
"uck": "Server", # Cloud Key
}
# UniFi device type → NAPALM-style platform name
DEVICE_TYPE_TO_PLATFORM = {
"ugw": "Ubiquiti UniFi OS",
"usw": "Ubiquiti UniFi OS",
"uap": "Ubiquiti UniFi OS",
"uxg": "Ubiquiti UniFi OS",
"udm": "Ubiquiti UniFi OS",
"uck": "Ubiquiti UniFi OS",
}
# UniFi port speed → NetBox interface type
SPEED_TO_IFACE_TYPE = {
10: "1000base-t", # 10 Mbps — map to closest
100: "100base-tx",
1000: "1000base-t",
2500: "2.5gbase-t",
5000: "5gbase-t",
10000: "10gbase-t",
25000: "25gbase-x-sfp28",
40000: "40gbase-x-qsfpp",
}
# WiFi band codes → NetBox wireless interface types
RADIO_BAND_TO_TYPE = {
"ng": "ieee-802.11n", # 2.4 GHz
"na": "ieee-802.11ac", # 5 GHz
"ac": "ieee-802.11ac", # 5 GHz 802.11ac
"ax": "ieee-802.11ax", # WiFi 6
"6e": "ieee-802.11ax", # WiFi 6E
"be": "ieee-802.11ax", # WiFi 7 (closest mapping)
}
# WiFi band display names
RADIO_BAND_DISPLAY = {
"ng": "2.4 GHz",
"na": "5 GHz",
"ac": "5 GHz",
"ax": "WiFi 6",
"6e": "6 GHz",
"be": "WiFi 7",
}
# WiFi standard from band code
BAND_TO_STANDARD = {
"ng": "ieee-802.11n",
"na": "ieee-802.11ac",
"ac": "ieee-802.11ac",
"ax": "ieee-802.11ax",
"6e": "ieee-802.11ax",
"be": "ieee-802.11ax",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
return {
"host": os.environ.get("UNIFI_HOST", ""),
"user": os.environ.get("UNIFI_USER", ""),
"password": os.environ.get("UNIFI_PASSWORD", ""),
"site_id": os.environ.get("UNIFI_SITE", "default"),
"verify_ssl": os.environ.get("UNIFI_VERIFY_SSL", "false").lower() == "true",
"is_udm": os.environ.get("UNIFI_IS_UDM", "true").lower() == "true",
"netbox_site": os.environ.get("UNIFI_NETBOX_SITE", "main"),
}
# ---------------------------------------------------------------------------
# Device reference helper
# ---------------------------------------------------------------------------
def _device_ref(name: str, model: str, role: str, site_name: str) -> Device:
return Device(
name=name,
device_type=DeviceType(
model=model,
manufacturer=Manufacturer(name="Ubiquiti"),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
)
# ---------------------------------------------------------------------------
# Connection
# ---------------------------------------------------------------------------
def connect_unifi(cfg: dict) -> UnifiController:
host = cfg["host"]
if not host:
log.error("UNIFI_HOST not set")
sys.exit(1)
url = host if host.startswith("http") else f"https://{host}"
log.info("Connecting to UniFi controller at %s ...", url)
controller = UnifiController(
controller_url=url,
username=cfg["user"],
password=cfg["password"],
is_udm_pro=cfg["is_udm"],
verify_ssl=cfg["verify_ssl"],
)
log.info("Connected to UniFi controller")
return controller
# ---------------------------------------------------------------------------
# Data collection and entity building
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
controller = connect_unifi(cfg)
site_id = cfg["site_id"]
site_name = cfg["netbox_site"]
entities: list[Entity] = []
# Track device names for LLDP cable dedup
mac_to_device: dict[str, dict] = {}
all_lldp_entries: list[tuple[str, str, dict]] = [] # (device_name, local_port, lldp_entry)
# --- Devices (UDM, switches, APs) ---
log.info("Fetching devices from site '%s' ...", site_id)
try:
devices = controller.get_unifi_site_device(site_id, raw=True)
except Exception as exc:
log.error("Failed to fetch devices: %s", exc)
devices = []
if not devices:
log.warning("No devices returned from UniFi controller")
return entities
log.info("Found %d UniFi devices", len(devices))
for dev in devices:
try:
dev_entities, dev_lldp = _build_device_entities(dev, site_name, mac_to_device)
entities.extend(dev_entities)
all_lldp_entries.extend(dev_lldp)
except Exception as exc:
dev_name = dev.get("name") or dev.get("mac", "?")
log.error("Failed to process device %s: %s", dev_name, exc)
# --- Cables from LLDP ---
cable_entities = _build_cable_entities(all_lldp_entries, mac_to_device, site_name)
entities.extend(cable_entities)
# --- Networks / VLANs ---
log.info("Fetching network configurations ...")
try:
networks = controller.get_unifi_site_networkconf(site_id, raw=True)
log.info("Found %d networks", len(networks))
for net in networks:
try:
entities.extend(_build_network_entities(net, site_name))
except Exception as exc:
log.error("Failed to process network %s: %s",
net.get("name", "?"), exc)
except Exception as exc:
log.warning("Failed to fetch networks: %s", exc)
# --- WLANs ---
log.info("Fetching WLAN configurations ...")
try:
wlans = controller.get_unifi_site_wlanconf(site_id, raw=True)
log.info("Found %d WLANs", len(wlans))
for wlan in wlans:
try:
entities.extend(_build_wlan_entities(wlan, site_name))
except Exception as exc:
log.error("Failed to process WLAN %s: %s",
wlan.get("name", "?"), exc)
except Exception as exc:
log.warning("Failed to fetch WLANs: %s", exc)
return entities
# ---------------------------------------------------------------------------
# Device entity builder
# ---------------------------------------------------------------------------
def _build_device_entities(dev: dict, site_name: str,
mac_to_device: dict) -> tuple[list[Entity], list]:
"""Build Device + Interface entities from a UniFi device dict."""
entities: list[Entity] = []
lldp_entries: list[tuple[str, str, dict]] = []
mac = dev.get("mac", "")
name = dev.get("name") or dev.get("hostname") or mac
model_code = dev.get("model", "Unknown")
model_name = dev.get("model_name") or dev.get("model_in_lts") or model_code
dev_type = dev.get("type", "")
serial = dev.get("serial", "")
ip = dev.get("ip", "")
version = dev.get("version", "")
state = dev.get("state", 0)
adopted = dev.get("adopted", False)
role = DEVICE_TYPE_TO_ROLE.get(dev_type, "Network Device")
platform = DEVICE_TYPE_TO_PLATFORM.get(dev_type, "Ubiquiti UniFi OS")
# Device status
if state == 1 and adopted:
status = "active"
elif state == 0:
status = "offline"
else:
status = "planned"
# Track MAC → device info for LLDP
mac_to_device[mac.lower()] = {
"name": name,
"model": model_name,
"role": role,
}
# Custom fields
custom_fields = {}
if mac:
custom_fields["unifi_mac"] = CustomFieldValue(text=mac)
if version:
custom_fields["unifi_firmware"] = CustomFieldValue(text=version)
# --- Device entity ---
device_kwargs = dict(
name=name,
device_type=DeviceType(
model=model_name,
manufacturer=Manufacturer(name="Ubiquiti"),
),
role=DeviceRole(name=role),
site=Site(name=site_name),
platform=Platform(name=platform),
serial=serial[:50] if serial else "",
status=status,
tags=["unifi"],
)
if custom_fields:
device_kwargs["custom_fields"] = custom_fields
entities.append(Entity(device=Device(**device_kwargs)))
dev_ref = _device_ref(name, model_name, role, site_name)
# --- Management IP ---
if ip and ip != "0.0.0.0":
# Management interface
entities.append(Entity(interface=Interface(
device=dev_ref,
name="mgmt",
type="other",
mac_address=mac,
enabled=True,
description="Management interface",
tags=["unifi"],
)))
entities.append(Entity(ip_address=IPAddress(
address=f"{ip}/32",
status="active",
assigned_object_interface=Interface(
device=dev_ref,
name="mgmt",
type="other",
),
tags=["unifi"],
)))
# --- Switch ports (port_table) ---
port_table = dev.get("port_table") or []
for port in port_table:
port_entities, port_lldp = _build_port_entities(port, dev_ref, name, mac_to_device)
entities.extend(port_entities)
lldp_entries.extend(port_lldp)
# --- WiFi radios (radio_table) ---
radio_table = dev.get("radio_table") or []
radio_stats = dev.get("radio_table_stats") or []
# Merge stats into radio data
radio_stats_map = {}
for rs in radio_stats:
rname = rs.get("name") or rs.get("radio", "")
if rname:
radio_stats_map[rname] = rs
for radio in radio_table:
entities.extend(_build_radio_entities(radio, radio_stats_map, dev_ref))
# --- Ethernet interfaces (ethernet_table) ---
eth_table = dev.get("ethernet_table") or []
for eth in eth_table:
eth_mac = eth.get("mac", "")
eth_name = eth.get("name") or eth.get("label", "")
if eth_name and eth_name != "mgmt":
num_ports = eth.get("num_port", 0)
desc = f"{num_ports} ports" if num_ports else ""
entities.append(Entity(interface=Interface(
device=dev_ref,
name=eth_name[:64],
type="other",
mac_address=eth_mac,
enabled=True,
description=desc,
tags=["unifi"],
)))
# --- LLDP entries for cable discovery ---
lldp_info = dev.get("lldp_table") or dev.get("lldp_info") or []
for entry in lldp_info:
local_port = entry.get("local_port_name") or f"port{entry.get('local_port_idx', '?')}"
lldp_entries.append((name, local_port, entry))
return entities, lldp_entries
# ---------------------------------------------------------------------------
# Port entity builder
# ---------------------------------------------------------------------------
def _build_port_entities(port: dict, dev_ref: Device, dev_name: str,
mac_to_device: dict) -> tuple[list[Entity], list]:
"""Build Interface entities from a switch port_table entry."""
entities: list[Entity] = []
lldp_entries: list[tuple[str, str, dict]] = []
port_idx = port.get("port_idx", 0)
port_name = port.get("name") or f"Port {port_idx}"
mac = port.get("mac", "")
speed = int(port.get("speed", 0) or 0)
is_uplink = port.get("is_uplink", False)
up = port.get("up", False)
enabled = port.get("enable", True)
poe_enable = port.get("poe_enable", False)
poe_power = port.get("poe_power") or ""
media = port.get("media") or ""
sfp_found = port.get("sfp_found", False)
# Determine interface type from speed and media
if sfp_found or "SFP" in media.upper():
if speed >= 10000:
iface_type = "10gbase-x-sfpp"
elif speed >= 25000:
iface_type = "25gbase-x-sfp28"
else:
iface_type = "1000base-x-sfp"
else:
iface_type = SPEED_TO_IFACE_TYPE.get(speed, "1000base-t")
# Description parts
desc_parts = []
if is_uplink:
desc_parts.append("Uplink")
if poe_enable:
power_str = f" ({poe_power}W)" if poe_power else ""
desc_parts.append(f"PoE{power_str}")
entities.append(Entity(interface=Interface(
device=dev_ref,
name=port_name[:64],
type=iface_type,
mac_address=mac,
speed=speed * 1000 if speed else 0, # Mbps → Kbps
enabled=enabled,
description=", ".join(desc_parts)[:200] if desc_parts else "",
tags=["unifi"],
)))
# Collect LLDP from port if present
port_lldp = port.get("lldp_table") or []
for entry in port_lldp:
lldp_entries.append((dev_name, port_name, entry))
return entities, lldp_entries
# ---------------------------------------------------------------------------
# Radio entity builder
# ---------------------------------------------------------------------------
def _build_radio_entities(radio: dict, stats_map: dict,
dev_ref: Device) -> list[Entity]:
"""Build Interface entities from a WiFi radio_table entry."""
entities: list[Entity] = []
radio_name = radio.get("name") or radio.get("radio", "wifi0")
band = radio.get("radio", "ng")
channel = radio.get("channel") or ""
tx_power = radio.get("tx_power") or radio.get("tx_power_mode") or ""
ht = radio.get("ht") or ""
band_display = RADIO_BAND_DISPLAY.get(band, band)
iface_type = RADIO_BAND_TO_TYPE.get(band, "ieee-802.11n")
desc_parts = [band_display]
if channel:
desc_parts.append(f"ch{channel}")
if tx_power:
desc_parts.append(f"{tx_power}dBm")
if ht:
desc_parts.append(ht)
# Merge stats if available
stats = stats_map.get(radio_name, {})
num_sta = stats.get("user-num_sta") or stats.get("user_num_sta", 0)
if num_sta:
desc_parts.append(f"{num_sta} clients")
entities.append(Entity(interface=Interface(
device=dev_ref,
name=radio_name[:64],
type=iface_type,
enabled=True,
description=", ".join(desc_parts)[:200],
tags=["unifi", "wireless"],
)))
return entities
# ---------------------------------------------------------------------------
# Cable entity builder (from LLDP)
# ---------------------------------------------------------------------------
def _build_cable_entities(lldp_entries: list[tuple[str, str, dict]],
mac_to_device: dict,
site_name: str) -> list[Entity]:
"""Build Cable entities from LLDP neighbor data, deduplicating pairs."""
entities: list[Entity] = []
seen_cables: set[tuple] = set()
for dev_name, local_port, entry in lldp_entries:
chassis_id = (entry.get("chassis_id") or "").lower().replace("-", ":")
remote_port = entry.get("port_id") or entry.get("port_descr") or ""
chassis_name = entry.get("chassis_descr") or ""
if not chassis_id and not chassis_name:
continue
# Try to resolve remote device by MAC
remote_dev = mac_to_device.get(chassis_id, {})
remote_name = remote_dev.get("name") or chassis_name or chassis_id
remote_model = remote_dev.get("model", "Unknown")
remote_role = remote_dev.get("role", "Network Device")
# Dedup: sorted pair key
pair = tuple(sorted([
(dev_name, local_port),
(remote_name, remote_port),
]))
if pair in seen_cables:
continue
seen_cables.add(pair)
a_name, a_port = pair[0]
b_name, b_port = pair[1]
# Look up device info for both sides
a_info = None
b_info = None
for mac, info in mac_to_device.items():
if info["name"] == a_name:
a_info = info
if info["name"] == b_name:
b_info = info
a_model = a_info["model"] if a_info else "Unknown"
a_role = a_info["role"] if a_info else "Network Device"
b_model = b_info["model"] if b_info else "Unknown"
b_role = b_info["role"] if b_info else "Network Device"
a_ref = _device_ref(a_name, a_model, a_role, site_name)
b_ref = _device_ref(b_name, b_model, b_role, site_name)
entities.append(Entity(cable=Cable(
a_terminations=[CableTermination(
termination=GenericObject(
object_interface=Interface(
device=a_ref,
name=a_port[:64],
)
),
)],
b_terminations=[CableTermination(
termination=GenericObject(
object_interface=Interface(
device=b_ref,
name=b_port[:64],
)
),
)],
status="connected",
tags=["unifi", "lldp"],
)))
log.debug("Cable: %s:%s%s:%s", a_name, a_port, b_name, b_port)
log.info("Built %d cable entities from LLDP", len(entities))
return entities
# ---------------------------------------------------------------------------
# Network / VLAN entity builder
# ---------------------------------------------------------------------------
def _build_network_entities(net: dict, site_name: str) -> list[Entity]:
"""Build VLAN + Prefix entities from a UniFi network config."""
entities: list[Entity] = []
name = net.get("name", "")
purpose = net.get("purpose", "")
vlan_id = net.get("vlan")
vlan_enabled = net.get("vlan_enabled", False)
subnet = net.get("ip_subnet", "")
enabled = net.get("enabled", True)
if not name:
return entities
# VLAN entity (if VLAN tagging is enabled)
if vlan_enabled and vlan_id:
try:
vid = int(vlan_id)
except (ValueError, TypeError):
vid = None
if vid and 1 <= vid <= 4094:
entities.append(Entity(vlan=VLAN(
vid=vid,
name=name[:64],
group=VLANGroup(name="UniFi"),
site=Site(name=site_name),
status="active" if enabled else "reserved",
tags=["unifi"],
)))
# Prefix entity (if subnet is defined)
if subnet and "/" in subnet:
entities.append(Entity(prefix=Prefix(
prefix=subnet,
site=Site(name=site_name),
status="active",
description=f"UniFi network: {name} ({purpose})"[:200],
tags=["unifi"],
)))
return entities
# ---------------------------------------------------------------------------
# WLAN entity builder
# ---------------------------------------------------------------------------
def _build_wlan_entities(wlan: dict, site_name: str) -> list[Entity]:
"""Build WirelessLAN entities from a UniFi WLAN config."""
entities: list[Entity] = []
name = wlan.get("name", "")
ssid = wlan.get("name", "") # UniFi uses 'name' as the SSID
enabled = wlan.get("enabled", True)
security = wlan.get("security", "")
hide_ssid = wlan.get("hide_ssid", False)
wpa_mode = wlan.get("wpa_mode", "")
wpa3 = wlan.get("wpa3_support", False)
if not ssid:
return entities
# Determine auth type for NetBox
if security == "open":
auth_type = "open"
elif wpa3:
auth_type = "wpa3-personal"
elif "wpa2" in wpa_mode.lower() or security == "wpapsk":
auth_type = "wpa2-personal"
elif "wpa-enterprise" in security.lower() or "wpaeap" in security.lower():
auth_type = "wpa2-enterprise"
else:
auth_type = "wpa2-personal"
# Get VLAN if assigned
vlan_id = None
# UniFi WLANs link to networks via networkconf_id, but we don't have
# easy access to the VLAN ID without cross-referencing. Mark in description.
desc_parts = []
if hide_ssid:
desc_parts.append("Hidden SSID")
if security:
desc_parts.append(f"Security: {security}")
entities.append(Entity(wireless_lan=WirelessLAN(
ssid=ssid,
group=WirelessLANGroup(name="UniFi"),
status="active" if enabled else "disabled",
auth_type=auth_type,
description=", ".join(desc_parts)[:200] if desc_parts else "",
tags=["unifi"],
)))
return entities
# ---------------------------------------------------------------------------
# Ingest
# ---------------------------------------------------------------------------
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
from netboxlabs.diode.sdk.ingester import create_message_chunks
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="unifi-collector",
app_version="0.1.0",
) as client:
chunks = create_message_chunks(entities)
for idx, chunk in enumerate(chunks):
resp = client.ingest(entities=chunk)
if resp.errors:
log.error("Chunk %d errors: %s", idx, resp.errors)
else:
log.info("Chunk %d: %d entities ingested", idx, len(chunk))
def main():
parser = argparse.ArgumentParser(description="UniFi collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()

View File

@ -264,7 +264,7 @@ def build_host_entities(si, site_name: str) -> tuple[list[Entity], dict]:
device=dev_ref,
name=pnic.device,
type=iface_type,
mac_address=pnic.mac or "",
primary_mac_address=pnic.mac or "",
speed=speed_mbps * 1000 if speed_mbps else 0,
enabled=True,
tags=["vmware"],
@ -284,7 +284,7 @@ def build_host_entities(si, site_name: str) -> tuple[list[Entity], dict]:
device=dev_ref,
name=vnic.device,
type="virtual",
mac_address=vnic.spec.mac if vnic.spec else "",
primary_mac_address=vnic.spec.mac if vnic.spec else "",
enabled=True,
tags=["vmware", "vmkernel"],
)))
@ -408,7 +408,7 @@ def build_vm_entities(si, site_name: str,
virtual_machine=vm_ref,
name=nic_name[:64],
enabled=device.connectable.connected if device.connectable else True,
mac_address=mac,
primary_mac_address=mac,
description=net_name[:200] if net_name else "",
tags=["vmware"],
)))