2026-02-28 16:10:12 -07:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""UniFi collector for NetBox via Diode SDK.
|
|
|
|
|
|
|
|
|
|
Discovers Ubiquiti UniFi devices (UDM, switches, APs), their ports, radios,
|
|
|
|
|
VLANs, WLANs, and LLDP neighbors from the local UniFi Controller API and
|
|
|
|
|
ingests them into NetBox.
|
|
|
|
|
|
|
|
|
|
Supports:
|
|
|
|
|
- UDM Pro / UDM-SE / UDR / Cloud Key Gen2+ (UniFi OS)
|
|
|
|
|
- Legacy UniFi Controller (standalone)
|
|
|
|
|
|
|
|
|
|
Usage:
|
|
|
|
|
python collectors/unifi_collector.py --dry-run
|
|
|
|
|
python collectors/unifi_collector.py
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
|
|
|
|
|
from unifi_controller_api import UnifiController
|
|
|
|
|
|
|
|
|
|
from netboxlabs.diode.sdk import DiodeClient
|
|
|
|
|
from netboxlabs.diode.sdk.ingester import (
|
|
|
|
|
Cable,
|
|
|
|
|
CableTermination,
|
|
|
|
|
CustomFieldValue,
|
|
|
|
|
Device,
|
|
|
|
|
DeviceRole,
|
|
|
|
|
DeviceType,
|
|
|
|
|
Entity,
|
|
|
|
|
GenericObject,
|
|
|
|
|
Interface,
|
|
|
|
|
IPAddress,
|
|
|
|
|
Manufacturer,
|
|
|
|
|
Platform,
|
|
|
|
|
Prefix,
|
|
|
|
|
Site,
|
|
|
|
|
VLAN,
|
|
|
|
|
VLANGroup,
|
|
|
|
|
WirelessLAN,
|
|
|
|
|
WirelessLANGroup,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger("unifi-collector")
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# UniFi → NetBox mappings
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
# UniFi device type strings → NetBox role
|
|
|
|
|
DEVICE_TYPE_TO_ROLE = {
|
|
|
|
|
"ugw": "Firewall", # UniFi Gateway (USG, UDM, UDM-SE, etc.)
|
|
|
|
|
"usw": "Switch", # UniFi Switch
|
|
|
|
|
"uap": "Access Point", # UniFi AP
|
|
|
|
|
"uxg": "Firewall", # UniFi Next-Gen Gateway
|
|
|
|
|
"udm": "Firewall", # UniFi Dream Machine
|
|
|
|
|
"uck": "Server", # Cloud Key
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# UniFi device type → NAPALM-style platform name
|
|
|
|
|
DEVICE_TYPE_TO_PLATFORM = {
|
|
|
|
|
"ugw": "Ubiquiti UniFi OS",
|
|
|
|
|
"usw": "Ubiquiti UniFi OS",
|
|
|
|
|
"uap": "Ubiquiti UniFi OS",
|
|
|
|
|
"uxg": "Ubiquiti UniFi OS",
|
|
|
|
|
"udm": "Ubiquiti UniFi OS",
|
|
|
|
|
"uck": "Ubiquiti UniFi OS",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# UniFi port speed → NetBox interface type
|
|
|
|
|
SPEED_TO_IFACE_TYPE = {
|
|
|
|
|
10: "1000base-t", # 10 Mbps — map to closest
|
|
|
|
|
100: "100base-tx",
|
|
|
|
|
1000: "1000base-t",
|
|
|
|
|
2500: "2.5gbase-t",
|
|
|
|
|
5000: "5gbase-t",
|
|
|
|
|
10000: "10gbase-t",
|
|
|
|
|
25000: "25gbase-x-sfp28",
|
|
|
|
|
40000: "40gbase-x-qsfpp",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# WiFi band codes → NetBox wireless interface types
|
|
|
|
|
RADIO_BAND_TO_TYPE = {
|
|
|
|
|
"ng": "ieee-802.11n", # 2.4 GHz
|
|
|
|
|
"na": "ieee-802.11ac", # 5 GHz
|
|
|
|
|
"ac": "ieee-802.11ac", # 5 GHz 802.11ac
|
|
|
|
|
"ax": "ieee-802.11ax", # WiFi 6
|
|
|
|
|
"6e": "ieee-802.11ax", # WiFi 6E
|
|
|
|
|
"be": "ieee-802.11ax", # WiFi 7 (closest mapping)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# WiFi band display names
|
|
|
|
|
RADIO_BAND_DISPLAY = {
|
|
|
|
|
"ng": "2.4 GHz",
|
|
|
|
|
"na": "5 GHz",
|
|
|
|
|
"ac": "5 GHz",
|
|
|
|
|
"ax": "WiFi 6",
|
|
|
|
|
"6e": "6 GHz",
|
|
|
|
|
"be": "WiFi 7",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# WiFi standard from band code
|
|
|
|
|
BAND_TO_STANDARD = {
|
|
|
|
|
"ng": "ieee-802.11n",
|
|
|
|
|
"na": "ieee-802.11ac",
|
|
|
|
|
"ac": "ieee-802.11ac",
|
|
|
|
|
"ax": "ieee-802.11ax",
|
|
|
|
|
"6e": "ieee-802.11ax",
|
|
|
|
|
"be": "ieee-802.11ax",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Configuration
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_dotenv(path: str = ".env") -> None:
|
|
|
|
|
if not os.path.isfile(path):
|
|
|
|
|
return
|
|
|
|
|
with open(path) as fh:
|
|
|
|
|
for line in fh:
|
|
|
|
|
line = line.strip()
|
|
|
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
|
|
|
continue
|
|
|
|
|
key, _, val = line.partition("=")
|
|
|
|
|
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_config() -> dict:
|
|
|
|
|
return {
|
|
|
|
|
"host": os.environ.get("UNIFI_HOST", ""),
|
|
|
|
|
"user": os.environ.get("UNIFI_USER", ""),
|
|
|
|
|
"password": os.environ.get("UNIFI_PASSWORD", ""),
|
|
|
|
|
"site_id": os.environ.get("UNIFI_SITE", "default"),
|
|
|
|
|
"verify_ssl": os.environ.get("UNIFI_VERIFY_SSL", "false").lower() == "true",
|
|
|
|
|
"is_udm": os.environ.get("UNIFI_IS_UDM", "true").lower() == "true",
|
|
|
|
|
"netbox_site": os.environ.get("UNIFI_NETBOX_SITE", "main"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Device reference helper
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _device_ref(name: str, model: str, role: str, site_name: str) -> Device:
|
|
|
|
|
return Device(
|
|
|
|
|
name=name,
|
|
|
|
|
device_type=DeviceType(
|
|
|
|
|
model=model,
|
|
|
|
|
manufacturer=Manufacturer(name="Ubiquiti"),
|
|
|
|
|
),
|
|
|
|
|
role=DeviceRole(name=role),
|
|
|
|
|
site=Site(name=site_name),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Connection
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def connect_unifi(cfg: dict) -> UnifiController:
|
|
|
|
|
host = cfg["host"]
|
|
|
|
|
if not host:
|
|
|
|
|
log.error("UNIFI_HOST not set")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
url = host if host.startswith("http") else f"https://{host}"
|
|
|
|
|
|
|
|
|
|
log.info("Connecting to UniFi controller at %s ...", url)
|
|
|
|
|
controller = UnifiController(
|
|
|
|
|
controller_url=url,
|
|
|
|
|
username=cfg["user"],
|
|
|
|
|
password=cfg["password"],
|
|
|
|
|
is_udm_pro=cfg["is_udm"],
|
|
|
|
|
verify_ssl=cfg["verify_ssl"],
|
|
|
|
|
)
|
|
|
|
|
log.info("Connected to UniFi controller")
|
|
|
|
|
return controller
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Data collection and entity building
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_all_entities(cfg: dict) -> list[Entity]:
|
|
|
|
|
controller = connect_unifi(cfg)
|
|
|
|
|
site_id = cfg["site_id"]
|
|
|
|
|
site_name = cfg["netbox_site"]
|
|
|
|
|
entities: list[Entity] = []
|
|
|
|
|
|
|
|
|
|
# Track device names for LLDP cable dedup
|
|
|
|
|
mac_to_device: dict[str, dict] = {}
|
|
|
|
|
all_lldp_entries: list[tuple[str, str, dict]] = [] # (device_name, local_port, lldp_entry)
|
|
|
|
|
|
|
|
|
|
# --- Devices (UDM, switches, APs) ---
|
|
|
|
|
log.info("Fetching devices from site '%s' ...", site_id)
|
|
|
|
|
try:
|
|
|
|
|
devices = controller.get_unifi_site_device(site_id, raw=True)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log.error("Failed to fetch devices: %s", exc)
|
|
|
|
|
devices = []
|
|
|
|
|
|
|
|
|
|
if not devices:
|
|
|
|
|
log.warning("No devices returned from UniFi controller")
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
log.info("Found %d UniFi devices", len(devices))
|
|
|
|
|
|
|
|
|
|
for dev in devices:
|
|
|
|
|
try:
|
|
|
|
|
dev_entities, dev_lldp = _build_device_entities(dev, site_name, mac_to_device)
|
|
|
|
|
entities.extend(dev_entities)
|
|
|
|
|
all_lldp_entries.extend(dev_lldp)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
dev_name = dev.get("name") or dev.get("mac", "?")
|
|
|
|
|
log.error("Failed to process device %s: %s", dev_name, exc)
|
|
|
|
|
|
|
|
|
|
# --- Cables from LLDP ---
|
|
|
|
|
cable_entities = _build_cable_entities(all_lldp_entries, mac_to_device, site_name)
|
|
|
|
|
entities.extend(cable_entities)
|
|
|
|
|
|
|
|
|
|
# --- Networks / VLANs ---
|
|
|
|
|
log.info("Fetching network configurations ...")
|
|
|
|
|
try:
|
|
|
|
|
networks = controller.get_unifi_site_networkconf(site_id, raw=True)
|
|
|
|
|
log.info("Found %d networks", len(networks))
|
|
|
|
|
for net in networks:
|
|
|
|
|
try:
|
|
|
|
|
entities.extend(_build_network_entities(net, site_name))
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log.error("Failed to process network %s: %s",
|
|
|
|
|
net.get("name", "?"), exc)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log.warning("Failed to fetch networks: %s", exc)
|
|
|
|
|
|
|
|
|
|
# --- WLANs ---
|
|
|
|
|
log.info("Fetching WLAN configurations ...")
|
|
|
|
|
try:
|
|
|
|
|
wlans = controller.get_unifi_site_wlanconf(site_id, raw=True)
|
|
|
|
|
log.info("Found %d WLANs", len(wlans))
|
|
|
|
|
for wlan in wlans:
|
|
|
|
|
try:
|
|
|
|
|
entities.extend(_build_wlan_entities(wlan, site_name))
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log.error("Failed to process WLAN %s: %s",
|
|
|
|
|
wlan.get("name", "?"), exc)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log.warning("Failed to fetch WLANs: %s", exc)
|
|
|
|
|
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Device entity builder
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_device_entities(dev: dict, site_name: str,
|
|
|
|
|
mac_to_device: dict) -> tuple[list[Entity], list]:
|
|
|
|
|
"""Build Device + Interface entities from a UniFi device dict."""
|
|
|
|
|
entities: list[Entity] = []
|
|
|
|
|
lldp_entries: list[tuple[str, str, dict]] = []
|
|
|
|
|
|
|
|
|
|
mac = dev.get("mac", "")
|
|
|
|
|
name = dev.get("name") or dev.get("hostname") or mac
|
|
|
|
|
model_code = dev.get("model", "Unknown")
|
|
|
|
|
model_name = dev.get("model_name") or dev.get("model_in_lts") or model_code
|
|
|
|
|
dev_type = dev.get("type", "")
|
|
|
|
|
serial = dev.get("serial", "")
|
|
|
|
|
ip = dev.get("ip", "")
|
|
|
|
|
version = dev.get("version", "")
|
|
|
|
|
state = dev.get("state", 0)
|
|
|
|
|
adopted = dev.get("adopted", False)
|
|
|
|
|
|
|
|
|
|
role = DEVICE_TYPE_TO_ROLE.get(dev_type, "Network Device")
|
|
|
|
|
platform = DEVICE_TYPE_TO_PLATFORM.get(dev_type, "Ubiquiti UniFi OS")
|
|
|
|
|
|
|
|
|
|
# Device status
|
|
|
|
|
if state == 1 and adopted:
|
|
|
|
|
status = "active"
|
|
|
|
|
elif state == 0:
|
|
|
|
|
status = "offline"
|
|
|
|
|
else:
|
|
|
|
|
status = "planned"
|
|
|
|
|
|
|
|
|
|
# Track MAC → device info for LLDP
|
|
|
|
|
mac_to_device[mac.lower()] = {
|
|
|
|
|
"name": name,
|
|
|
|
|
"model": model_name,
|
|
|
|
|
"role": role,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Custom fields
|
|
|
|
|
custom_fields = {}
|
|
|
|
|
if mac:
|
|
|
|
|
custom_fields["unifi_mac"] = CustomFieldValue(text=mac)
|
|
|
|
|
if version:
|
|
|
|
|
custom_fields["unifi_firmware"] = CustomFieldValue(text=version)
|
|
|
|
|
|
|
|
|
|
# --- Device entity ---
|
|
|
|
|
device_kwargs = dict(
|
|
|
|
|
name=name,
|
|
|
|
|
device_type=DeviceType(
|
|
|
|
|
model=model_name,
|
|
|
|
|
manufacturer=Manufacturer(name="Ubiquiti"),
|
|
|
|
|
),
|
|
|
|
|
role=DeviceRole(name=role),
|
|
|
|
|
site=Site(name=site_name),
|
|
|
|
|
platform=Platform(name=platform),
|
|
|
|
|
serial=serial[:50] if serial else "",
|
|
|
|
|
status=status,
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)
|
|
|
|
|
if custom_fields:
|
|
|
|
|
device_kwargs["custom_fields"] = custom_fields
|
|
|
|
|
|
|
|
|
|
entities.append(Entity(device=Device(**device_kwargs)))
|
|
|
|
|
|
|
|
|
|
dev_ref = _device_ref(name, model_name, role, site_name)
|
|
|
|
|
|
|
|
|
|
# --- Management IP ---
|
|
|
|
|
if ip and ip != "0.0.0.0":
|
|
|
|
|
# Management interface
|
|
|
|
|
entities.append(Entity(interface=Interface(
|
|
|
|
|
device=dev_ref,
|
|
|
|
|
name="mgmt",
|
|
|
|
|
type="other",
|
|
|
|
|
mac_address=mac,
|
|
|
|
|
enabled=True,
|
|
|
|
|
description="Management interface",
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)))
|
|
|
|
|
entities.append(Entity(ip_address=IPAddress(
|
|
|
|
|
address=f"{ip}/32",
|
|
|
|
|
status="active",
|
|
|
|
|
assigned_object_interface=Interface(
|
|
|
|
|
device=dev_ref,
|
|
|
|
|
name="mgmt",
|
|
|
|
|
type="other",
|
|
|
|
|
),
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)))
|
|
|
|
|
|
|
|
|
|
# --- Switch ports (port_table) ---
|
|
|
|
|
port_table = dev.get("port_table") or []
|
|
|
|
|
for port in port_table:
|
|
|
|
|
port_entities, port_lldp = _build_port_entities(port, dev_ref, name, mac_to_device)
|
|
|
|
|
entities.extend(port_entities)
|
|
|
|
|
lldp_entries.extend(port_lldp)
|
|
|
|
|
|
|
|
|
|
# --- WiFi radios (radio_table) ---
|
|
|
|
|
radio_table = dev.get("radio_table") or []
|
|
|
|
|
radio_stats = dev.get("radio_table_stats") or []
|
|
|
|
|
# Merge stats into radio data
|
|
|
|
|
radio_stats_map = {}
|
|
|
|
|
for rs in radio_stats:
|
|
|
|
|
rname = rs.get("name") or rs.get("radio", "")
|
|
|
|
|
if rname:
|
|
|
|
|
radio_stats_map[rname] = rs
|
|
|
|
|
|
|
|
|
|
for radio in radio_table:
|
|
|
|
|
entities.extend(_build_radio_entities(radio, radio_stats_map, dev_ref))
|
|
|
|
|
|
|
|
|
|
# --- Ethernet interfaces (ethernet_table) ---
|
|
|
|
|
eth_table = dev.get("ethernet_table") or []
|
|
|
|
|
for eth in eth_table:
|
|
|
|
|
eth_mac = eth.get("mac", "")
|
|
|
|
|
eth_name = eth.get("name") or eth.get("label", "")
|
|
|
|
|
if eth_name and eth_name != "mgmt":
|
|
|
|
|
num_ports = eth.get("num_port", 0)
|
|
|
|
|
desc = f"{num_ports} ports" if num_ports else ""
|
|
|
|
|
entities.append(Entity(interface=Interface(
|
|
|
|
|
device=dev_ref,
|
|
|
|
|
name=eth_name[:64],
|
|
|
|
|
type="other",
|
|
|
|
|
mac_address=eth_mac,
|
|
|
|
|
enabled=True,
|
|
|
|
|
description=desc,
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)))
|
|
|
|
|
|
|
|
|
|
# --- LLDP entries for cable discovery ---
|
|
|
|
|
lldp_info = dev.get("lldp_table") or dev.get("lldp_info") or []
|
|
|
|
|
for entry in lldp_info:
|
|
|
|
|
local_port = entry.get("local_port_name") or f"port{entry.get('local_port_idx', '?')}"
|
|
|
|
|
lldp_entries.append((name, local_port, entry))
|
|
|
|
|
|
|
|
|
|
return entities, lldp_entries
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Port entity builder
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_port_entities(port: dict, dev_ref: Device, dev_name: str,
|
|
|
|
|
mac_to_device: dict) -> tuple[list[Entity], list]:
|
|
|
|
|
"""Build Interface entities from a switch port_table entry."""
|
|
|
|
|
entities: list[Entity] = []
|
|
|
|
|
lldp_entries: list[tuple[str, str, dict]] = []
|
|
|
|
|
|
|
|
|
|
port_idx = port.get("port_idx", 0)
|
|
|
|
|
port_name = port.get("name") or f"Port {port_idx}"
|
|
|
|
|
mac = port.get("mac", "")
|
|
|
|
|
speed = int(port.get("speed", 0) or 0)
|
|
|
|
|
is_uplink = port.get("is_uplink", False)
|
|
|
|
|
up = port.get("up", False)
|
|
|
|
|
enabled = port.get("enable", True)
|
|
|
|
|
poe_enable = port.get("poe_enable", False)
|
|
|
|
|
poe_power = port.get("poe_power") or ""
|
|
|
|
|
media = port.get("media") or ""
|
|
|
|
|
sfp_found = port.get("sfp_found", False)
|
|
|
|
|
|
|
|
|
|
# Determine interface type from speed and media
|
|
|
|
|
if sfp_found or "SFP" in media.upper():
|
|
|
|
|
if speed >= 10000:
|
|
|
|
|
iface_type = "10gbase-x-sfpp"
|
|
|
|
|
elif speed >= 25000:
|
|
|
|
|
iface_type = "25gbase-x-sfp28"
|
|
|
|
|
else:
|
|
|
|
|
iface_type = "1000base-x-sfp"
|
|
|
|
|
else:
|
|
|
|
|
iface_type = SPEED_TO_IFACE_TYPE.get(speed, "1000base-t")
|
|
|
|
|
|
|
|
|
|
# Description parts
|
|
|
|
|
desc_parts = []
|
|
|
|
|
if is_uplink:
|
|
|
|
|
desc_parts.append("Uplink")
|
|
|
|
|
if poe_enable:
|
|
|
|
|
power_str = f" ({poe_power}W)" if poe_power else ""
|
|
|
|
|
desc_parts.append(f"PoE{power_str}")
|
|
|
|
|
|
|
|
|
|
entities.append(Entity(interface=Interface(
|
|
|
|
|
device=dev_ref,
|
|
|
|
|
name=port_name[:64],
|
|
|
|
|
type=iface_type,
|
|
|
|
|
mac_address=mac,
|
|
|
|
|
speed=speed * 1000 if speed else 0, # Mbps → Kbps
|
|
|
|
|
enabled=enabled,
|
|
|
|
|
description=", ".join(desc_parts)[:200] if desc_parts else "",
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)))
|
|
|
|
|
|
|
|
|
|
# Collect LLDP from port if present
|
|
|
|
|
port_lldp = port.get("lldp_table") or []
|
|
|
|
|
for entry in port_lldp:
|
|
|
|
|
lldp_entries.append((dev_name, port_name, entry))
|
|
|
|
|
|
|
|
|
|
return entities, lldp_entries
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Radio entity builder
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_radio_entities(radio: dict, stats_map: dict,
|
|
|
|
|
dev_ref: Device) -> list[Entity]:
|
|
|
|
|
"""Build Interface entities from a WiFi radio_table entry."""
|
|
|
|
|
entities: list[Entity] = []
|
|
|
|
|
|
|
|
|
|
radio_name = radio.get("name") or radio.get("radio", "wifi0")
|
|
|
|
|
band = radio.get("radio", "ng")
|
|
|
|
|
channel = radio.get("channel") or ""
|
|
|
|
|
tx_power = radio.get("tx_power") or radio.get("tx_power_mode") or ""
|
|
|
|
|
ht = radio.get("ht") or ""
|
|
|
|
|
|
|
|
|
|
band_display = RADIO_BAND_DISPLAY.get(band, band)
|
|
|
|
|
iface_type = RADIO_BAND_TO_TYPE.get(band, "ieee-802.11n")
|
|
|
|
|
|
|
|
|
|
desc_parts = [band_display]
|
|
|
|
|
if channel:
|
|
|
|
|
desc_parts.append(f"ch{channel}")
|
|
|
|
|
if tx_power:
|
|
|
|
|
desc_parts.append(f"{tx_power}dBm")
|
|
|
|
|
if ht:
|
|
|
|
|
desc_parts.append(ht)
|
|
|
|
|
|
|
|
|
|
# Merge stats if available
|
|
|
|
|
stats = stats_map.get(radio_name, {})
|
|
|
|
|
num_sta = stats.get("user-num_sta") or stats.get("user_num_sta", 0)
|
|
|
|
|
if num_sta:
|
|
|
|
|
desc_parts.append(f"{num_sta} clients")
|
|
|
|
|
|
|
|
|
|
entities.append(Entity(interface=Interface(
|
|
|
|
|
device=dev_ref,
|
|
|
|
|
name=radio_name[:64],
|
|
|
|
|
type=iface_type,
|
|
|
|
|
enabled=True,
|
|
|
|
|
description=", ".join(desc_parts)[:200],
|
|
|
|
|
tags=["unifi", "wireless"],
|
|
|
|
|
)))
|
|
|
|
|
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Cable entity builder (from LLDP)
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_cable_entities(lldp_entries: list[tuple[str, str, dict]],
|
|
|
|
|
mac_to_device: dict,
|
|
|
|
|
site_name: str) -> list[Entity]:
|
|
|
|
|
"""Build Cable entities from LLDP neighbor data, deduplicating pairs."""
|
|
|
|
|
entities: list[Entity] = []
|
|
|
|
|
seen_cables: set[tuple] = set()
|
|
|
|
|
|
|
|
|
|
for dev_name, local_port, entry in lldp_entries:
|
|
|
|
|
chassis_id = (entry.get("chassis_id") or "").lower().replace("-", ":")
|
|
|
|
|
remote_port = entry.get("port_id") or entry.get("port_descr") or ""
|
|
|
|
|
chassis_name = entry.get("chassis_descr") or ""
|
|
|
|
|
|
|
|
|
|
if not chassis_id and not chassis_name:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Try to resolve remote device by MAC
|
|
|
|
|
remote_dev = mac_to_device.get(chassis_id, {})
|
|
|
|
|
remote_name = remote_dev.get("name") or chassis_name or chassis_id
|
|
|
|
|
remote_model = remote_dev.get("model", "Unknown")
|
|
|
|
|
remote_role = remote_dev.get("role", "Network Device")
|
|
|
|
|
|
|
|
|
|
# Dedup: sorted pair key
|
|
|
|
|
pair = tuple(sorted([
|
|
|
|
|
(dev_name, local_port),
|
|
|
|
|
(remote_name, remote_port),
|
|
|
|
|
]))
|
|
|
|
|
if pair in seen_cables:
|
|
|
|
|
continue
|
|
|
|
|
seen_cables.add(pair)
|
|
|
|
|
|
|
|
|
|
a_name, a_port = pair[0]
|
|
|
|
|
b_name, b_port = pair[1]
|
|
|
|
|
|
|
|
|
|
# Look up device info for both sides
|
|
|
|
|
a_info = None
|
|
|
|
|
b_info = None
|
|
|
|
|
for mac, info in mac_to_device.items():
|
|
|
|
|
if info["name"] == a_name:
|
|
|
|
|
a_info = info
|
|
|
|
|
if info["name"] == b_name:
|
|
|
|
|
b_info = info
|
|
|
|
|
|
|
|
|
|
a_model = a_info["model"] if a_info else "Unknown"
|
|
|
|
|
a_role = a_info["role"] if a_info else "Network Device"
|
|
|
|
|
b_model = b_info["model"] if b_info else "Unknown"
|
|
|
|
|
b_role = b_info["role"] if b_info else "Network Device"
|
|
|
|
|
|
|
|
|
|
a_ref = _device_ref(a_name, a_model, a_role, site_name)
|
|
|
|
|
b_ref = _device_ref(b_name, b_model, b_role, site_name)
|
|
|
|
|
|
|
|
|
|
entities.append(Entity(cable=Cable(
|
|
|
|
|
a_terminations=[CableTermination(
|
|
|
|
|
termination=GenericObject(
|
|
|
|
|
object_interface=Interface(
|
|
|
|
|
device=a_ref,
|
|
|
|
|
name=a_port[:64],
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
)],
|
|
|
|
|
b_terminations=[CableTermination(
|
|
|
|
|
termination=GenericObject(
|
|
|
|
|
object_interface=Interface(
|
|
|
|
|
device=b_ref,
|
|
|
|
|
name=b_port[:64],
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
)],
|
|
|
|
|
status="connected",
|
|
|
|
|
tags=["unifi", "lldp"],
|
|
|
|
|
)))
|
|
|
|
|
log.debug("Cable: %s:%s ↔ %s:%s", a_name, a_port, b_name, b_port)
|
|
|
|
|
|
|
|
|
|
log.info("Built %d cable entities from LLDP", len(entities))
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Network / VLAN entity builder
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_network_entities(net: dict, site_name: str) -> list[Entity]:
|
|
|
|
|
"""Build VLAN + Prefix entities from a UniFi network config."""
|
|
|
|
|
entities: list[Entity] = []
|
|
|
|
|
|
|
|
|
|
name = net.get("name", "")
|
|
|
|
|
purpose = net.get("purpose", "")
|
|
|
|
|
vlan_id = net.get("vlan")
|
|
|
|
|
vlan_enabled = net.get("vlan_enabled", False)
|
|
|
|
|
subnet = net.get("ip_subnet", "")
|
|
|
|
|
enabled = net.get("enabled", True)
|
|
|
|
|
|
|
|
|
|
if not name:
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
# VLAN entity (if VLAN tagging is enabled)
|
|
|
|
|
if vlan_enabled and vlan_id:
|
|
|
|
|
try:
|
|
|
|
|
vid = int(vlan_id)
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
vid = None
|
|
|
|
|
|
|
|
|
|
if vid and 1 <= vid <= 4094:
|
|
|
|
|
entities.append(Entity(vlan=VLAN(
|
|
|
|
|
vid=vid,
|
|
|
|
|
name=name[:64],
|
|
|
|
|
group=VLANGroup(name="UniFi"),
|
|
|
|
|
site=Site(name=site_name),
|
|
|
|
|
status="active" if enabled else "reserved",
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)))
|
|
|
|
|
|
|
|
|
|
# Prefix entity (if subnet is defined)
|
|
|
|
|
if subnet and "/" in subnet:
|
|
|
|
|
entities.append(Entity(prefix=Prefix(
|
|
|
|
|
prefix=subnet,
|
2026-02-28 16:26:58 -07:00
|
|
|
scope_site=Site(name=site_name),
|
2026-02-28 16:10:12 -07:00
|
|
|
status="active",
|
|
|
|
|
description=f"UniFi network: {name} ({purpose})"[:200],
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)))
|
|
|
|
|
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# WLAN entity builder
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_wlan_entities(wlan: dict, site_name: str) -> list[Entity]:
|
|
|
|
|
"""Build WirelessLAN entities from a UniFi WLAN config."""
|
|
|
|
|
entities: list[Entity] = []
|
|
|
|
|
|
|
|
|
|
name = wlan.get("name", "")
|
|
|
|
|
ssid = wlan.get("name", "") # UniFi uses 'name' as the SSID
|
|
|
|
|
enabled = wlan.get("enabled", True)
|
|
|
|
|
security = wlan.get("security", "")
|
|
|
|
|
hide_ssid = wlan.get("hide_ssid", False)
|
|
|
|
|
wpa_mode = wlan.get("wpa_mode", "")
|
|
|
|
|
wpa3 = wlan.get("wpa3_support", False)
|
|
|
|
|
|
|
|
|
|
if not ssid:
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
# Determine auth type for NetBox
|
|
|
|
|
if security == "open":
|
|
|
|
|
auth_type = "open"
|
|
|
|
|
elif wpa3:
|
|
|
|
|
auth_type = "wpa3-personal"
|
|
|
|
|
elif "wpa2" in wpa_mode.lower() or security == "wpapsk":
|
|
|
|
|
auth_type = "wpa2-personal"
|
|
|
|
|
elif "wpa-enterprise" in security.lower() or "wpaeap" in security.lower():
|
|
|
|
|
auth_type = "wpa2-enterprise"
|
|
|
|
|
else:
|
|
|
|
|
auth_type = "wpa2-personal"
|
|
|
|
|
|
|
|
|
|
# Get VLAN if assigned
|
|
|
|
|
vlan_id = None
|
|
|
|
|
# UniFi WLANs link to networks via networkconf_id, but we don't have
|
|
|
|
|
# easy access to the VLAN ID without cross-referencing. Mark in description.
|
|
|
|
|
|
|
|
|
|
desc_parts = []
|
|
|
|
|
if hide_ssid:
|
|
|
|
|
desc_parts.append("Hidden SSID")
|
|
|
|
|
if security:
|
|
|
|
|
desc_parts.append(f"Security: {security}")
|
|
|
|
|
|
|
|
|
|
entities.append(Entity(wireless_lan=WirelessLAN(
|
|
|
|
|
ssid=ssid,
|
|
|
|
|
group=WirelessLANGroup(name="UniFi"),
|
|
|
|
|
status="active" if enabled else "disabled",
|
|
|
|
|
auth_type=auth_type,
|
|
|
|
|
description=", ".join(desc_parts)[:200] if desc_parts else "",
|
|
|
|
|
tags=["unifi"],
|
|
|
|
|
)))
|
|
|
|
|
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Ingest
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
|
|
|
|
|
if not entities:
|
|
|
|
|
log.warning("No entities to ingest")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
|
|
|
|
|
client_id = os.environ.get("DIODE_CLIENT_ID",
|
|
|
|
|
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
|
|
|
|
|
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
|
|
|
|
|
os.environ.get("INGESTER_CLIENT_SECRET", ""))
|
|
|
|
|
|
|
|
|
|
if dry_run:
|
|
|
|
|
log.info("DRY RUN: %d entities would be ingested", len(entities))
|
|
|
|
|
for i, e in enumerate(entities):
|
|
|
|
|
log.info(" [%d] %s", i, e)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not client_secret:
|
|
|
|
|
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
log.info("Ingesting %d entities to %s ...", len(entities), target)
|
|
|
|
|
|
|
|
|
|
with DiodeClient(
|
|
|
|
|
target=target,
|
|
|
|
|
client_id=client_id,
|
|
|
|
|
client_secret=client_secret,
|
|
|
|
|
app_name="unifi-collector",
|
|
|
|
|
app_version="0.1.0",
|
|
|
|
|
) as client:
|
2026-02-28 16:49:25 -07:00
|
|
|
resp = client.ingest(entities=entities)
|
|
|
|
|
if resp.errors:
|
|
|
|
|
log.error("Ingestion errors: %s", resp.errors)
|
|
|
|
|
else:
|
|
|
|
|
log.info("Ingestion successful")
|
2026-02-28 16:10:12 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
parser = argparse.ArgumentParser(description="UniFi collector for NetBox")
|
|
|
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
|
|
|
parser.add_argument("--log-level", default="INFO",
|
|
|
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
|
|
|
|
|
parser.add_argument("--env-file", default=".env")
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
|
|
level=getattr(logging, args.log_level),
|
|
|
|
|
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
load_dotenv(args.env_file)
|
|
|
|
|
cfg = get_config()
|
|
|
|
|
|
|
|
|
|
entities = collect_all_entities(cfg)
|
|
|
|
|
log.info("Total entities: %d", len(entities))
|
|
|
|
|
|
|
|
|
|
ingest_entities(entities, dry_run=args.dry_run)
|
|
|
|
|
log.info("Done!")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|