#!/usr/bin/env python3 """UniFi collector for NetBox via Diode SDK. Discovers Ubiquiti UniFi devices (UDM, switches, APs), their ports, radios, VLANs, WLANs, and LLDP neighbors from the local UniFi Controller API and ingests them into NetBox. Supports: - UDM Pro / UDM-SE / UDR / Cloud Key Gen2+ (UniFi OS) - Legacy UniFi Controller (standalone) Usage: python collectors/unifi_collector.py --dry-run python collectors/unifi_collector.py """ import argparse import logging import os import sys from unifi_controller_api import UnifiController from netboxlabs.diode.sdk import DiodeClient from netboxlabs.diode.sdk.ingester import ( Cable, CableTermination, CustomFieldValue, Device, DeviceRole, DeviceType, Entity, GenericObject, Interface, IPAddress, Manufacturer, Platform, Prefix, Site, VLAN, VLANGroup, WirelessLAN, WirelessLANGroup, ) log = logging.getLogger("unifi-collector") # --------------------------------------------------------------------------- # UniFi → NetBox mappings # --------------------------------------------------------------------------- # UniFi device type strings → NetBox role DEVICE_TYPE_TO_ROLE = { "ugw": "Firewall", # UniFi Gateway (USG, UDM, UDM-SE, etc.) "usw": "Switch", # UniFi Switch "uap": "Access Point", # UniFi AP "uxg": "Firewall", # UniFi Next-Gen Gateway "udm": "Firewall", # UniFi Dream Machine "uck": "Server", # Cloud Key } # UniFi device type → NAPALM-style platform name DEVICE_TYPE_TO_PLATFORM = { "ugw": "Ubiquiti UniFi OS", "usw": "Ubiquiti UniFi OS", "uap": "Ubiquiti UniFi OS", "uxg": "Ubiquiti UniFi OS", "udm": "Ubiquiti UniFi OS", "uck": "Ubiquiti UniFi OS", } # UniFi port speed → NetBox interface type SPEED_TO_IFACE_TYPE = { 10: "1000base-t", # 10 Mbps — map to closest 100: "100base-tx", 1000: "1000base-t", 2500: "2.5gbase-t", 5000: "5gbase-t", 10000: "10gbase-t", 25000: "25gbase-x-sfp28", 40000: "40gbase-x-qsfpp", } # WiFi band codes → NetBox wireless interface types RADIO_BAND_TO_TYPE = { "ng": "ieee-802.11n", # 2.4 GHz "na": "ieee-802.11ac", # 5 GHz "ac": "ieee-802.11ac", # 5 GHz 802.11ac "ax": "ieee-802.11ax", # WiFi 6 "6e": "ieee-802.11ax", # WiFi 6E "be": "ieee-802.11ax", # WiFi 7 (closest mapping) } # WiFi band display names RADIO_BAND_DISPLAY = { "ng": "2.4 GHz", "na": "5 GHz", "ac": "5 GHz", "ax": "WiFi 6", "6e": "6 GHz", "be": "WiFi 7", } # WiFi standard from band code BAND_TO_STANDARD = { "ng": "ieee-802.11n", "na": "ieee-802.11ac", "ac": "ieee-802.11ac", "ax": "ieee-802.11ax", "6e": "ieee-802.11ax", "be": "ieee-802.11ax", } # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def load_dotenv(path: str = ".env") -> None: if not os.path.isfile(path): return with open(path) as fh: for line in fh: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, val = line.partition("=") os.environ.setdefault(key.strip(), val.strip().strip("\"'")) def get_config() -> dict: return { "host": os.environ.get("UNIFI_HOST", ""), "user": os.environ.get("UNIFI_USER", ""), "password": os.environ.get("UNIFI_PASSWORD", ""), "site_id": os.environ.get("UNIFI_SITE", "default"), "verify_ssl": os.environ.get("UNIFI_VERIFY_SSL", "false").lower() == "true", "is_udm": os.environ.get("UNIFI_IS_UDM", "true").lower() == "true", "netbox_site": os.environ.get("UNIFI_NETBOX_SITE", "main"), } # --------------------------------------------------------------------------- # Device reference helper # --------------------------------------------------------------------------- def _device_ref(name: str, model: str, role: str, site_name: str) -> Device: return Device( name=name, device_type=DeviceType( model=model, manufacturer=Manufacturer(name="Ubiquiti"), ), role=DeviceRole(name=role), site=Site(name=site_name), ) # --------------------------------------------------------------------------- # Connection # --------------------------------------------------------------------------- def connect_unifi(cfg: dict) -> UnifiController: host = cfg["host"] if not host: log.error("UNIFI_HOST not set") sys.exit(1) url = host if host.startswith("http") else f"https://{host}" log.info("Connecting to UniFi controller at %s ...", url) controller = UnifiController( controller_url=url, username=cfg["user"], password=cfg["password"], is_udm_pro=cfg["is_udm"], verify_ssl=cfg["verify_ssl"], ) log.info("Connected to UniFi controller") return controller # --------------------------------------------------------------------------- # Data collection and entity building # --------------------------------------------------------------------------- def collect_all_entities(cfg: dict) -> list[Entity]: controller = connect_unifi(cfg) site_id = cfg["site_id"] site_name = cfg["netbox_site"] entities: list[Entity] = [] # Track device names for LLDP cable dedup mac_to_device: dict[str, dict] = {} all_lldp_entries: list[tuple[str, str, dict]] = [] # (device_name, local_port, lldp_entry) # --- Devices (UDM, switches, APs) --- log.info("Fetching devices from site '%s' ...", site_id) try: devices = controller.get_unifi_site_device(site_id, raw=True) except Exception as exc: log.error("Failed to fetch devices: %s", exc) devices = [] if not devices: log.warning("No devices returned from UniFi controller") return entities log.info("Found %d UniFi devices", len(devices)) for dev in devices: try: dev_entities, dev_lldp = _build_device_entities(dev, site_name, mac_to_device) entities.extend(dev_entities) all_lldp_entries.extend(dev_lldp) except Exception as exc: dev_name = dev.get("name") or dev.get("mac", "?") log.error("Failed to process device %s: %s", dev_name, exc) # --- Cables from LLDP --- cable_entities = _build_cable_entities(all_lldp_entries, mac_to_device, site_name) entities.extend(cable_entities) # --- Networks / VLANs --- log.info("Fetching network configurations ...") try: networks = controller.get_unifi_site_networkconf(site_id, raw=True) log.info("Found %d networks", len(networks)) for net in networks: try: entities.extend(_build_network_entities(net, site_name)) except Exception as exc: log.error("Failed to process network %s: %s", net.get("name", "?"), exc) except Exception as exc: log.warning("Failed to fetch networks: %s", exc) # --- WLANs --- log.info("Fetching WLAN configurations ...") try: wlans = controller.get_unifi_site_wlanconf(site_id, raw=True) log.info("Found %d WLANs", len(wlans)) for wlan in wlans: try: entities.extend(_build_wlan_entities(wlan, site_name)) except Exception as exc: log.error("Failed to process WLAN %s: %s", wlan.get("name", "?"), exc) except Exception as exc: log.warning("Failed to fetch WLANs: %s", exc) return entities # --------------------------------------------------------------------------- # Device entity builder # --------------------------------------------------------------------------- def _build_device_entities(dev: dict, site_name: str, mac_to_device: dict) -> tuple[list[Entity], list]: """Build Device + Interface entities from a UniFi device dict.""" entities: list[Entity] = [] lldp_entries: list[tuple[str, str, dict]] = [] mac = dev.get("mac", "") name = dev.get("name") or dev.get("hostname") or mac model_code = dev.get("model", "Unknown") model_name = dev.get("model_name") or dev.get("model_in_lts") or model_code dev_type = dev.get("type", "") serial = dev.get("serial", "") ip = dev.get("ip", "") version = dev.get("version", "") state = dev.get("state", 0) adopted = dev.get("adopted", False) role = DEVICE_TYPE_TO_ROLE.get(dev_type, "Network Device") platform = DEVICE_TYPE_TO_PLATFORM.get(dev_type, "Ubiquiti UniFi OS") # Device status if state == 1 and adopted: status = "active" elif state == 0: status = "offline" else: status = "planned" # Track MAC → device info for LLDP mac_to_device[mac.lower()] = { "name": name, "model": model_name, "role": role, } # Custom fields custom_fields = {} if mac: custom_fields["unifi_mac"] = CustomFieldValue(text=mac) if version: custom_fields["unifi_firmware"] = CustomFieldValue(text=version) # --- Device entity --- device_kwargs = dict( name=name, device_type=DeviceType( model=model_name, manufacturer=Manufacturer(name="Ubiquiti"), ), role=DeviceRole(name=role), site=Site(name=site_name), platform=Platform(name=platform), serial=serial[:50] if serial else "", status=status, tags=["unifi"], ) if custom_fields: device_kwargs["custom_fields"] = custom_fields entities.append(Entity(device=Device(**device_kwargs))) dev_ref = _device_ref(name, model_name, role, site_name) # --- Management IP --- if ip and ip != "0.0.0.0": # Management interface entities.append(Entity(interface=Interface( device=dev_ref, name="mgmt", type="other", mac_address=mac, enabled=True, description="Management interface", tags=["unifi"], ))) entities.append(Entity(ip_address=IPAddress( address=f"{ip}/32", status="active", assigned_object_interface=Interface( device=dev_ref, name="mgmt", type="other", ), tags=["unifi"], ))) # --- Switch ports (port_table) --- port_table = dev.get("port_table") or [] for port in port_table: port_entities, port_lldp = _build_port_entities(port, dev_ref, name, mac_to_device) entities.extend(port_entities) lldp_entries.extend(port_lldp) # --- WiFi radios (radio_table) --- radio_table = dev.get("radio_table") or [] radio_stats = dev.get("radio_table_stats") or [] # Merge stats into radio data radio_stats_map = {} for rs in radio_stats: rname = rs.get("name") or rs.get("radio", "") if rname: radio_stats_map[rname] = rs for radio in radio_table: entities.extend(_build_radio_entities(radio, radio_stats_map, dev_ref)) # --- Ethernet interfaces (ethernet_table) --- eth_table = dev.get("ethernet_table") or [] for eth in eth_table: eth_mac = eth.get("mac", "") eth_name = eth.get("name") or eth.get("label", "") if eth_name and eth_name != "mgmt": num_ports = eth.get("num_port", 0) desc = f"{num_ports} ports" if num_ports else "" entities.append(Entity(interface=Interface( device=dev_ref, name=eth_name[:64], type="other", mac_address=eth_mac, enabled=True, description=desc, tags=["unifi"], ))) # --- LLDP entries for cable discovery --- lldp_info = dev.get("lldp_table") or dev.get("lldp_info") or [] for entry in lldp_info: local_port = entry.get("local_port_name") or f"port{entry.get('local_port_idx', '?')}" lldp_entries.append((name, local_port, entry)) return entities, lldp_entries # --------------------------------------------------------------------------- # Port entity builder # --------------------------------------------------------------------------- def _build_port_entities(port: dict, dev_ref: Device, dev_name: str, mac_to_device: dict) -> tuple[list[Entity], list]: """Build Interface entities from a switch port_table entry.""" entities: list[Entity] = [] lldp_entries: list[tuple[str, str, dict]] = [] port_idx = port.get("port_idx", 0) port_name = port.get("name") or f"Port {port_idx}" mac = port.get("mac", "") speed = int(port.get("speed", 0) or 0) is_uplink = port.get("is_uplink", False) up = port.get("up", False) enabled = port.get("enable", True) poe_enable = port.get("poe_enable", False) poe_power = port.get("poe_power") or "" media = port.get("media") or "" sfp_found = port.get("sfp_found", False) # Determine interface type from speed and media if sfp_found or "SFP" in media.upper(): if speed >= 10000: iface_type = "10gbase-x-sfpp" elif speed >= 25000: iface_type = "25gbase-x-sfp28" else: iface_type = "1000base-x-sfp" else: iface_type = SPEED_TO_IFACE_TYPE.get(speed, "1000base-t") # Description parts desc_parts = [] if is_uplink: desc_parts.append("Uplink") if poe_enable: power_str = f" ({poe_power}W)" if poe_power else "" desc_parts.append(f"PoE{power_str}") entities.append(Entity(interface=Interface( device=dev_ref, name=port_name[:64], type=iface_type, mac_address=mac, speed=speed * 1000 if speed else 0, # Mbps → Kbps enabled=enabled, description=", ".join(desc_parts)[:200] if desc_parts else "", tags=["unifi"], ))) # Collect LLDP from port if present port_lldp = port.get("lldp_table") or [] for entry in port_lldp: lldp_entries.append((dev_name, port_name, entry)) return entities, lldp_entries # --------------------------------------------------------------------------- # Radio entity builder # --------------------------------------------------------------------------- def _build_radio_entities(radio: dict, stats_map: dict, dev_ref: Device) -> list[Entity]: """Build Interface entities from a WiFi radio_table entry.""" entities: list[Entity] = [] radio_name = radio.get("name") or radio.get("radio", "wifi0") band = radio.get("radio", "ng") channel = radio.get("channel") or "" tx_power = radio.get("tx_power") or radio.get("tx_power_mode") or "" ht = radio.get("ht") or "" band_display = RADIO_BAND_DISPLAY.get(band, band) iface_type = RADIO_BAND_TO_TYPE.get(band, "ieee-802.11n") desc_parts = [band_display] if channel: desc_parts.append(f"ch{channel}") if tx_power: desc_parts.append(f"{tx_power}dBm") if ht: desc_parts.append(ht) # Merge stats if available stats = stats_map.get(radio_name, {}) num_sta = stats.get("user-num_sta") or stats.get("user_num_sta", 0) if num_sta: desc_parts.append(f"{num_sta} clients") entities.append(Entity(interface=Interface( device=dev_ref, name=radio_name[:64], type=iface_type, enabled=True, description=", ".join(desc_parts)[:200], tags=["unifi", "wireless"], ))) return entities # --------------------------------------------------------------------------- # Cable entity builder (from LLDP) # --------------------------------------------------------------------------- def _build_cable_entities(lldp_entries: list[tuple[str, str, dict]], mac_to_device: dict, site_name: str) -> list[Entity]: """Build Cable entities from LLDP neighbor data, deduplicating pairs.""" entities: list[Entity] = [] seen_cables: set[tuple] = set() for dev_name, local_port, entry in lldp_entries: chassis_id = (entry.get("chassis_id") or "").lower().replace("-", ":") remote_port = entry.get("port_id") or entry.get("port_descr") or "" chassis_name = entry.get("chassis_descr") or "" if not chassis_id and not chassis_name: continue # Try to resolve remote device by MAC remote_dev = mac_to_device.get(chassis_id, {}) remote_name = remote_dev.get("name") or chassis_name or chassis_id remote_model = remote_dev.get("model", "Unknown") remote_role = remote_dev.get("role", "Network Device") # Dedup: sorted pair key pair = tuple(sorted([ (dev_name, local_port), (remote_name, remote_port), ])) if pair in seen_cables: continue seen_cables.add(pair) a_name, a_port = pair[0] b_name, b_port = pair[1] # Look up device info for both sides a_info = None b_info = None for mac, info in mac_to_device.items(): if info["name"] == a_name: a_info = info if info["name"] == b_name: b_info = info a_model = a_info["model"] if a_info else "Unknown" a_role = a_info["role"] if a_info else "Network Device" b_model = b_info["model"] if b_info else "Unknown" b_role = b_info["role"] if b_info else "Network Device" a_ref = _device_ref(a_name, a_model, a_role, site_name) b_ref = _device_ref(b_name, b_model, b_role, site_name) entities.append(Entity(cable=Cable( a_terminations=[CableTermination( termination=GenericObject( object_interface=Interface( device=a_ref, name=a_port[:64], ) ), )], b_terminations=[CableTermination( termination=GenericObject( object_interface=Interface( device=b_ref, name=b_port[:64], ) ), )], status="connected", tags=["unifi", "lldp"], ))) log.debug("Cable: %s:%s ↔ %s:%s", a_name, a_port, b_name, b_port) log.info("Built %d cable entities from LLDP", len(entities)) return entities # --------------------------------------------------------------------------- # Network / VLAN entity builder # --------------------------------------------------------------------------- def _build_network_entities(net: dict, site_name: str) -> list[Entity]: """Build VLAN + Prefix entities from a UniFi network config.""" entities: list[Entity] = [] name = net.get("name", "") purpose = net.get("purpose", "") vlan_id = net.get("vlan") vlan_enabled = net.get("vlan_enabled", False) subnet = net.get("ip_subnet", "") enabled = net.get("enabled", True) if not name: return entities # VLAN entity (if VLAN tagging is enabled) if vlan_enabled and vlan_id: try: vid = int(vlan_id) except (ValueError, TypeError): vid = None if vid and 1 <= vid <= 4094: entities.append(Entity(vlan=VLAN( vid=vid, name=name[:64], group=VLANGroup(name="UniFi"), site=Site(name=site_name), status="active" if enabled else "reserved", tags=["unifi"], ))) # Prefix entity (if subnet is defined) if subnet and "/" in subnet: entities.append(Entity(prefix=Prefix( prefix=subnet, scope_site=Site(name=site_name), status="active", description=f"UniFi network: {name} ({purpose})"[:200], tags=["unifi"], ))) return entities # --------------------------------------------------------------------------- # WLAN entity builder # --------------------------------------------------------------------------- def _build_wlan_entities(wlan: dict, site_name: str) -> list[Entity]: """Build WirelessLAN entities from a UniFi WLAN config.""" entities: list[Entity] = [] name = wlan.get("name", "") ssid = wlan.get("name", "") # UniFi uses 'name' as the SSID enabled = wlan.get("enabled", True) security = wlan.get("security", "") hide_ssid = wlan.get("hide_ssid", False) wpa_mode = wlan.get("wpa_mode", "") wpa3 = wlan.get("wpa3_support", False) if not ssid: return entities # Determine auth type for NetBox if security == "open": auth_type = "open" elif wpa3: auth_type = "wpa3-personal" elif "wpa2" in wpa_mode.lower() or security == "wpapsk": auth_type = "wpa2-personal" elif "wpa-enterprise" in security.lower() or "wpaeap" in security.lower(): auth_type = "wpa2-enterprise" else: auth_type = "wpa2-personal" # Get VLAN if assigned vlan_id = None # UniFi WLANs link to networks via networkconf_id, but we don't have # easy access to the VLAN ID without cross-referencing. Mark in description. desc_parts = [] if hide_ssid: desc_parts.append("Hidden SSID") if security: desc_parts.append(f"Security: {security}") entities.append(Entity(wireless_lan=WirelessLAN( ssid=ssid, group=WirelessLANGroup(name="UniFi"), status="active" if enabled else "disabled", auth_type=auth_type, description=", ".join(desc_parts)[:200] if desc_parts else "", tags=["unifi"], ))) return entities # --------------------------------------------------------------------------- # Ingest # --------------------------------------------------------------------------- def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: if not entities: log.warning("No entities to ingest") return target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") client_id = os.environ.get("DIODE_CLIENT_ID", os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) client_secret = os.environ.get("DIODE_CLIENT_SECRET", os.environ.get("INGESTER_CLIENT_SECRET", "")) if dry_run: log.info("DRY RUN: %d entities would be ingested", len(entities)) for i, e in enumerate(entities): log.info(" [%d] %s", i, e) return if not client_secret: log.error("DIODE_CLIENT_SECRET not set — cannot ingest") sys.exit(1) log.info("Ingesting %d entities to %s ...", len(entities), target) with DiodeClient( target=target, client_id=client_id, client_secret=client_secret, app_name="unifi-collector", app_version="0.1.0", ) as client: resp = client.ingest(entities=entities) if resp.errors: log.error("Ingestion errors: %s", resp.errors) else: log.info("Ingestion successful") def main(): parser = argparse.ArgumentParser(description="UniFi collector for NetBox") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"]) parser.add_argument("--env-file", default=".env") args = parser.parse_args() logging.basicConfig( level=getattr(logging, args.log_level), format="%(asctime)s %(name)s %(levelname)s %(message)s", ) load_dotenv(args.env_file) cfg = get_config() entities = collect_all_entities(cfg) log.info("Total entities: %d", len(entities)) ingest_entities(entities, dry_run=args.dry_run) log.info("Done!") if __name__ == "__main__": main()