netbox-diode-project/collectors/cable_exporter.py
sam 1445e06f34 Add LLDP cable exporter for NetBox REST API import
Creates cables from LLDP neighbor data by collecting LLDP from all
inventory devices, validating both endpoints against the NetBox inventory
(devices + interfaces), deduplicating bidirectional links, and importing
via the NetBox REST API. Handles interface name normalization across
vendors (NOS space-delimited names, abbreviated LLDP names, etc.).

First run: 30 cables created across Cisco IOS, IOS-XR, Brocade ICX/VDX,
and CML lab routers. Idempotent on re-run (skips already-cabled interfaces).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 18:03:50 -07:00

576 lines
22 KiB
Python

#!/usr/bin/env python3
"""LLDP Cable Exporter — collect LLDP neighbors and create cables in NetBox.
Connects to devices via SSH (NAPALM or Netmiko), collects LLDP neighbor data,
validates both endpoints against the NetBox inventory, deduplicates bidirectional
links, and creates cables via the NetBox REST API.
Usage:
python collectors/cable_exporter.py -i collectors/inventory.yaml --dry-run
python collectors/cable_exporter.py -i collectors/inventory.yaml
python collectors/cable_exporter.py -i collectors/inventory.yaml --csv-only
"""
from __future__ import annotations
import argparse
import csv
import logging
import os
import re
import sys
from pathlib import Path
import requests
from dotenv import load_dotenv
# Import shared utilities from network_collector
from network_collector import (
NETMIKO_ONLY_DRIVERS,
_netmiko_parse_lldp,
connect_device,
connect_netmiko,
load_inventory,
merge_device_config,
normalize_interface_name,
)
log = logging.getLogger("cable-exporter")
# Interface types that cannot be cable endpoints in NetBox
UNCABLEABLE_TYPES = {"virtual", "lag"}
# ---------------------------------------------------------------------------
# NetBox API helpers
# ---------------------------------------------------------------------------
class NetBoxClient:
"""Thin wrapper around the NetBox REST API."""
def __init__(self, url: str, token: str):
self.base = url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
})
def _get_all(self, endpoint: str, params: dict | None = None) -> list[dict]:
"""Paginate through all results for an endpoint."""
results = []
url = f"{self.base}{endpoint}"
p = dict(params or {})
p.setdefault("limit", 250)
while url:
resp = self.session.get(url, params=p)
resp.raise_for_status()
data = resp.json()
results.extend(data.get("results", []))
url = data.get("next")
p = {} # next URL already has params
return results
def get_devices(self, site: str | None = None) -> list[dict]:
params = {}
if site:
params["site"] = site
return self._get_all("/api/dcim/devices/", params)
def get_interfaces(self, device_id: int) -> list[dict]:
return self._get_all("/api/dcim/interfaces/", {"device_id": device_id})
def get_cables(self) -> list[dict]:
return self._get_all("/api/dcim/cables/")
def ensure_tag(self, name: str, slug: str | None = None) -> dict:
"""Create tag if it doesn't exist, return tag dict."""
slug = slug or name
existing = self._get_all("/api/extras/tags/", {"name": name})
if existing:
return existing[0]
resp = self.session.post(
f"{self.base}/api/extras/tags/",
json={"name": name, "slug": slug},
)
resp.raise_for_status()
return resp.json()
def create_cable(self, a_iface_id: int, b_iface_id: int,
status: str = "connected", tag_ids: list[int] | None = None) -> dict:
"""Create a cable between two interface IDs."""
payload = {
"a_terminations": [{"object_type": "dcim.interface", "object_id": a_iface_id}],
"b_terminations": [{"object_type": "dcim.interface", "object_id": b_iface_id}],
"status": status,
}
if tag_ids:
payload["tags"] = [{"id": tid} for tid in tag_ids]
resp = self.session.post(f"{self.base}/api/dcim/cables/", json=payload)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# NetBox inventory cache
# ---------------------------------------------------------------------------
def build_netbox_cache(nb: NetBoxClient, site: str) -> dict:
"""Build lookup tables from NetBox API.
Returns:
{
"devices": {device_name: device_id},
"interfaces": {(device_id, iface_name): {"id": N, "type": "..."}},
"cabled_ifaces": set of interface IDs that already have cables,
}
"""
cache = {"devices": {}, "interfaces": {}, "cabled_ifaces": set()}
# Load devices at target site
devices = nb.get_devices(site=site)
log.info(" Loaded %d devices from site '%s'", len(devices), site)
for dev in devices:
name = dev["name"]
if name in cache["devices"]:
log.debug(" Duplicate device name '%s' at site '%s', keeping first", name, site)
continue
cache["devices"][name] = dev["id"]
# Load interfaces for each device
iface_count = 0
for dev_name, dev_id in cache["devices"].items():
ifaces = nb.get_interfaces(dev_id)
for iface in ifaces:
iface_type = iface["type"]["value"] if iface.get("type") else "other"
cache["interfaces"][(dev_id, iface["name"])] = {
"id": iface["id"],
"type": iface_type,
}
iface_count += 1
log.info(" Loaded %d interfaces across %d devices", iface_count, len(cache["devices"]))
# Load existing cables to find already-cabled interfaces
cables = nb.get_cables()
for cable in cables:
for term in cable.get("a_terminations", []):
obj = term.get("object")
if obj:
cache["cabled_ifaces"].add(obj["id"])
for term in cable.get("b_terminations", []):
obj = term.get("object")
if obj:
cache["cabled_ifaces"].add(obj["id"])
log.info(" Found %d existing cables (%d cabled interfaces)",
len(cables), len(cache["cabled_ifaces"]))
return cache
# ---------------------------------------------------------------------------
# LLDP collection
# ---------------------------------------------------------------------------
def collect_lldp_from_device(device_cfg: dict) -> dict:
"""Connect to a single device and collect only LLDP neighbors.
Returns NAPALM-format lldp_neighbors dict or empty dict on failure.
"""
host = device_cfg["host"]
driver = device_cfg["driver"]
username = device_cfg["username"]
password = device_cfg["password"]
secret = device_cfg.get("secret", "")
timeout = device_cfg.get("timeout", 60)
use_netmiko = driver in NETMIKO_ONLY_DRIVERS
lldp = {}
if use_netmiko:
log.debug(" Using Netmiko fallback for %s (driver=%s)", host, driver)
try:
conn = connect_netmiko(host, driver, username, password, secret, timeout)
lldp = _netmiko_parse_lldp(conn, driver)
conn.disconnect()
except Exception as exc:
log.error(" Netmiko LLDP collection failed for %s: %s", host, exc)
else:
try:
dev = connect_device(host, driver, username, password, secret, timeout)
try:
lldp = dev.get_lldp_neighbors_detail()
except Exception as exc:
log.warning(" NAPALM get_lldp_neighbors_detail() failed for %s: %s", host, exc)
dev.close()
except Exception as exc:
log.error(" NAPALM connection failed for %s: %s", host, exc)
total = sum(len(v) for v in lldp.values())
if total:
log.info(" %s: %d LLDP neighbors", host, total)
return lldp
def collect_all_lldp(inventory: dict) -> dict:
"""Collect LLDP from all devices in inventory.
Returns {device_hostname: lldp_neighbors_dict} where hostname is the
inventory 'host' field (IP or name). We'll map to NetBox device names later.
"""
defaults = inventory.get("defaults", {})
all_lldp = {}
devices = inventory["devices"]
for entry in devices:
cfg = merge_device_config(entry, defaults)
host = cfg["host"]
log.info("Collecting LLDP from %s (driver=%s)...", host, cfg["driver"])
lldp = collect_lldp_from_device(cfg)
if lldp:
# Store with the inventory host key — we'll resolve to NetBox name later
all_lldp[host] = {"lldp": lldp, "config": cfg}
return all_lldp
# ---------------------------------------------------------------------------
# LLDP → Cable matching
# ---------------------------------------------------------------------------
_ABBREV_TO_LONG = {
"Gi": "GigabitEthernet", "Te": "TenGigabitEthernet",
"Fa": "FastEthernet", "Et": "Ethernet",
"Fo": "FortyGigabitEthernet", "Hu": "HundredGigE",
"Lo": "Loopback", "Vl": "Vlan", "Po": "Port-channel",
"Mg": "MgmtEth", "Tu": "Tunnel", "Se": "Serial",
}
_LONG_TO_ABBREV = {v: k for k, v in _ABBREV_TO_LONG.items()}
def lookup_interface(iface_cache: dict, dev_id: int, iface_name: str) -> dict | None:
"""Look up an interface with fallback name variations.
Tries multiple naming conventions:
1. Exact match
2. With/without space after type prefix
3. Abbreviated ↔ long form (Te ↔ TenGigabitEthernet)
"""
# Exact match
result = iface_cache.get((dev_id, iface_name))
if result:
return result
m = re.match(r"^([A-Za-z]+)\s*(\d.*)$", iface_name)
if not m:
return None
prefix, rest = m.group(1), m.group(2)
# Try with/without space
for fmt in [f"{prefix} {rest}", f"{prefix}{rest}"]:
result = iface_cache.get((dev_id, fmt))
if result:
return result
# Try long form ↔ abbreviated form
alt_prefix = _LONG_TO_ABBREV.get(prefix) or _ABBREV_TO_LONG.get(prefix)
if alt_prefix:
for fmt in [f"{alt_prefix} {rest}", f"{alt_prefix}{rest}"]:
result = iface_cache.get((dev_id, fmt))
if result:
return result
return None
def build_cable_list(all_lldp: dict, cache: dict, nb: NetBoxClient) -> list[dict]:
"""Build validated cable list from LLDP data + NetBox cache.
Returns list of cable dicts:
[{"a_device": str, "a_iface": str, "a_iface_id": int,
"b_device": str, "b_iface": str, "b_iface_id": int,
"skip_reason": str | None}]
"""
device_cache = cache["devices"]
iface_cache = cache["interfaces"]
cabled_ifaces = cache["cabled_ifaces"]
# Build IP → device_name mapping by querying NetBox for primary IPs
ip_to_device = _build_ip_to_device_map(nb, device_cache)
# Build hostname → device_name (case-insensitive) for LLDP remote_system_name
# Include both full names and short names (stripped FQDN) for matching
name_lower = {}
for name in device_cache:
name_lower[name.lower()] = name
# Also index short name (strip FQDN)
if "." in name:
short = name.split(".")[0].lower()
if short not in name_lower:
name_lower[short] = name
seen_links = set() # for bidirectional dedup
cables = []
stats = {"total_pairs": 0, "matched": 0, "skipped_device": 0,
"skipped_iface": 0, "skipped_type": 0, "skipped_cabled": 0, "skipped_dup": 0}
for host, data in all_lldp.items():
lldp = data["lldp"]
# Resolve local device name from IP
local_name = ip_to_device.get(host)
if not local_name:
log.warning(" %s: not found in NetBox, skipping all its neighbors", host)
continue
local_dev_id = device_cache[local_name]
for local_iface_raw, neighbors in lldp.items():
for neighbor in neighbors:
stats["total_pairs"] += 1
# -- Resolve remote device --
remote_sys = neighbor.get("remote_system_name", "").strip()
# Strip FQDN
if "." in remote_sys:
remote_sys = remote_sys.split(".")[0]
# Strip IOS-XR RP prefix
if ":" in remote_sys:
remote_sys = remote_sys.rsplit(":", 1)[-1]
remote_name = name_lower.get(remote_sys.lower())
if not remote_name:
# Try remote_system_description as fallback
desc = neighbor.get("remote_system_description", "").strip()
if desc:
short = desc.split(".")[0] if "." in desc else desc
remote_name = name_lower.get(short.lower())
if not remote_name:
stats["skipped_device"] += 1
log.debug(" Skip: remote device '%s' not in NetBox", remote_sys)
continue
remote_dev_id = device_cache[remote_name]
# -- Resolve local interface --
local_iface = normalize_interface_name(local_iface_raw.strip())
local_iface_info = lookup_interface(iface_cache, local_dev_id, local_iface)
if not local_iface_info:
stats["skipped_iface"] += 1
log.debug(" Skip: %s:%s not found in NetBox", local_name, local_iface)
continue
# -- Resolve remote interface --
remote_port_raw = (neighbor.get("remote_port", "")
or neighbor.get("remote_port_description", "")).strip()
remote_iface = normalize_interface_name(remote_port_raw)
remote_iface_info = lookup_interface(iface_cache, remote_dev_id, remote_iface)
if not remote_iface_info:
stats["skipped_iface"] += 1
log.debug(" Skip: %s:%s not found in NetBox", remote_name, remote_iface)
continue
# -- Check interface types (no LAG / virtual) --
if local_iface_info["type"] in UNCABLEABLE_TYPES:
stats["skipped_type"] += 1
log.debug(" Skip: %s:%s is type '%s' (uncableable)",
local_name, local_iface, local_iface_info["type"])
continue
if remote_iface_info["type"] in UNCABLEABLE_TYPES:
stats["skipped_type"] += 1
log.debug(" Skip: %s:%s is type '%s' (uncableable)",
remote_name, remote_iface, remote_iface_info["type"])
continue
# -- Check if already cabled --
a_id = local_iface_info["id"]
b_id = remote_iface_info["id"]
if a_id in cabled_ifaces or b_id in cabled_ifaces:
stats["skipped_cabled"] += 1
log.debug(" Skip: %s:%s or %s:%s already cabled",
local_name, local_iface, remote_name, remote_iface)
continue
# -- Bidirectional dedup --
link_key = tuple(sorted([a_id, b_id]))
if link_key in seen_links:
stats["skipped_dup"] += 1
continue
seen_links.add(link_key)
cables.append({
"a_device": local_name,
"a_iface": local_iface,
"a_iface_id": a_id,
"b_device": remote_name,
"b_iface": remote_iface,
"b_iface_id": b_id,
})
stats["matched"] += 1
log.info("Cable matching summary:")
log.info(" Total LLDP pairs: %d", stats["total_pairs"])
log.info(" Matched cables: %d", stats["matched"])
log.info(" Skipped (device not found): %d", stats["skipped_device"])
log.info(" Skipped (interface not found): %d", stats["skipped_iface"])
log.info(" Skipped (uncableable type): %d", stats["skipped_type"])
log.info(" Skipped (already cabled): %d", stats["skipped_cabled"])
log.info(" Skipped (bidirectional dup): %d", stats["skipped_dup"])
return cables
def _build_ip_to_device_map(nb: NetBoxClient, device_cache: dict) -> dict:
"""Build {ip_address: device_name} mapping from NetBox interface IPs.
This lets us map inventory hosts (which use IPs) to NetBox device names.
"""
ip_to_device = {}
# Query all IP addresses and map to their assigned device
ips = nb._get_all("/api/ipam/ip-addresses/", {"limit": 250})
for ip_entry in ips:
addr = ip_entry.get("address", "") # "10.100.0.100/24"
ip_only = addr.split("/")[0] if "/" in addr else addr
assigned = ip_entry.get("assigned_object")
if assigned and assigned.get("device"):
dev_name = assigned["device"]["name"]
if dev_name in device_cache:
ip_to_device[ip_only] = dev_name
log.info(" Built IP→device map: %d IPs mapped", len(ip_to_device))
return ip_to_device
# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------
def write_csv(cables: list[dict], output_path: str, site: str):
"""Write cables to CSV in NetBox bulk import format."""
with open(output_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"side_a_device", "side_a_type", "side_a_name",
"side_b_device", "side_b_type", "side_b_name",
"side_a_site", "side_b_site", "status", "tags",
])
for cable in cables:
writer.writerow([
cable["a_device"], "dcim.interface", cable["a_iface"],
cable["b_device"], "dcim.interface", cable["b_iface"],
site, site, "connected", "lldp-discovered",
])
log.info("CSV written to %s (%d cables)", output_path, len(cables))
def create_cables_via_api(nb: NetBoxClient, cables: list[dict], tag_id: int | None) -> int:
"""Create cables via NetBox REST API. Returns count of successfully created cables."""
created = 0
tag_ids = [tag_id] if tag_id else None
for cable in cables:
try:
nb.create_cable(cable["a_iface_id"], cable["b_iface_id"], tag_ids=tag_ids)
log.info(" Created: %s:%s <-> %s:%s",
cable["a_device"], cable["a_iface"],
cable["b_device"], cable["b_iface"])
created += 1
except requests.HTTPError as exc:
body = ""
try:
body = exc.response.json()
except Exception:
body = exc.response.text[:200]
log.error(" Failed: %s:%s <-> %s:%s%s",
cable["a_device"], cable["a_iface"],
cable["b_device"], cable["b_iface"], body)
return created
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="LLDP Cable Exporter for NetBox")
parser.add_argument("-i", "--inventory", required=True, help="Path to inventory.yaml")
parser.add_argument("--dry-run", action="store_true", help="Show cables without creating them")
parser.add_argument("--csv-only", action="store_true", help="Only generate CSV, don't call API")
parser.add_argument("--output", default="cables_export.csv", help="CSV output path")
parser.add_argument("--site", default="main", help="NetBox site to target (default: main)")
parser.add_argument("--log-level", default="INFO", help="Log level (DEBUG/INFO/WARNING)")
parser.add_argument("--env-file", default=".env", help="Path to .env file")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
# Load environment
env_path = Path(args.env_file)
if env_path.exists():
load_dotenv(env_path)
else:
log.warning("Env file %s not found, using environment variables", args.env_file)
netbox_url = os.environ.get("NETBOX_API_URL")
netbox_token = os.environ.get("NETBOX_API_TOKEN")
if not netbox_url or not netbox_token:
log.error("NETBOX_API_URL and NETBOX_API_TOKEN must be set in .env")
sys.exit(1)
# Load inventory
inventory = load_inventory(args.inventory)
device_count = len(inventory.get("devices", []))
log.info("Loaded %d devices from inventory", device_count)
# Connect to NetBox API
nb = NetBoxClient(netbox_url, netbox_token)
log.info("Building NetBox inventory cache (site=%s)...", args.site)
cache = build_netbox_cache(nb, args.site)
# Collect LLDP from all devices
log.info("Collecting LLDP neighbors from devices...")
all_lldp = collect_all_lldp(inventory)
connected = len(all_lldp)
log.info("LLDP collected from %d/%d devices", connected, device_count)
# Match and validate cables
log.info("Matching LLDP data against NetBox inventory...")
cables = build_cable_list(all_lldp, cache, nb)
if not cables:
log.info("No valid cables to create.")
return
# Output
if args.dry_run:
log.info("DRY RUN — %d cables would be created:", len(cables))
for c in cables:
log.info(" %s:%s <-> %s:%s", c["a_device"], c["a_iface"],
c["b_device"], c["b_iface"])
write_csv(cables, args.output, args.site)
return
# Write CSV
write_csv(cables, args.output, args.site)
if args.csv_only:
log.info("CSV-only mode — skipping API import")
return
# Ensure tag exists
tag = nb.ensure_tag("lldp-discovered")
tag_id = tag["id"]
# Create cables via API
log.info("Creating %d cables via NetBox API...", len(cables))
created = create_cables_via_api(nb, cables, tag_id)
log.info("Done! Created %d/%d cables.", created, len(cables))
if __name__ == "__main__":
main()