#!/usr/bin/env python3 """LLDP Cable Exporter — collect LLDP neighbors and create cables in NetBox. Connects to devices via SSH (NAPALM or Netmiko), collects LLDP neighbor data, validates both endpoints against the NetBox inventory, deduplicates bidirectional links, and creates cables via the NetBox REST API. Usage: python collectors/cable_exporter.py -i collectors/inventory.yaml --dry-run python collectors/cable_exporter.py -i collectors/inventory.yaml python collectors/cable_exporter.py -i collectors/inventory.yaml --csv-only """ from __future__ import annotations import argparse import csv import logging import os import re import sys from pathlib import Path import requests from dotenv import load_dotenv # Import shared utilities from network_collector from network_collector import ( NETMIKO_ONLY_DRIVERS, _netmiko_parse_lldp, connect_device, connect_netmiko, load_inventory, merge_device_config, normalize_interface_name, ) log = logging.getLogger("cable-exporter") # Interface types that cannot be cable endpoints in NetBox UNCABLEABLE_TYPES = {"virtual", "lag"} # --------------------------------------------------------------------------- # NetBox API helpers # --------------------------------------------------------------------------- class NetBoxClient: """Thin wrapper around the NetBox REST API.""" def __init__(self, url: str, token: str): self.base = url.rstrip("/") self.session = requests.Session() self.session.headers.update({ "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json", }) def _get_all(self, endpoint: str, params: dict | None = None) -> list[dict]: """Paginate through all results for an endpoint.""" results = [] url = f"{self.base}{endpoint}" p = dict(params or {}) p.setdefault("limit", 250) while url: resp = self.session.get(url, params=p) resp.raise_for_status() data = resp.json() results.extend(data.get("results", [])) url = data.get("next") p = {} # next URL already has params return results def get_devices(self, site: str | None = None) -> list[dict]: params = {} if site: params["site"] = site return self._get_all("/api/dcim/devices/", params) def get_interfaces(self, device_id: int) -> list[dict]: return self._get_all("/api/dcim/interfaces/", {"device_id": device_id}) def get_cables(self) -> list[dict]: return self._get_all("/api/dcim/cables/") def ensure_tag(self, name: str, slug: str | None = None) -> dict: """Create tag if it doesn't exist, return tag dict.""" slug = slug or name existing = self._get_all("/api/extras/tags/", {"name": name}) if existing: return existing[0] resp = self.session.post( f"{self.base}/api/extras/tags/", json={"name": name, "slug": slug}, ) resp.raise_for_status() return resp.json() def create_cable(self, a_iface_id: int, b_iface_id: int, status: str = "connected", tag_ids: list[int] | None = None) -> dict: """Create a cable between two interface IDs.""" payload = { "a_terminations": [{"object_type": "dcim.interface", "object_id": a_iface_id}], "b_terminations": [{"object_type": "dcim.interface", "object_id": b_iface_id}], "status": status, } if tag_ids: payload["tags"] = [{"id": tid} for tid in tag_ids] resp = self.session.post(f"{self.base}/api/dcim/cables/", json=payload) resp.raise_for_status() return resp.json() # --------------------------------------------------------------------------- # NetBox inventory cache # --------------------------------------------------------------------------- def build_netbox_cache(nb: NetBoxClient, site: str) -> dict: """Build lookup tables from NetBox API. Returns: { "devices": {device_name: device_id}, "interfaces": {(device_id, iface_name): {"id": N, "type": "..."}}, "cabled_ifaces": set of interface IDs that already have cables, } """ cache = {"devices": {}, "interfaces": {}, "cabled_ifaces": set()} # Load devices at target site devices = nb.get_devices(site=site) log.info(" Loaded %d devices from site '%s'", len(devices), site) for dev in devices: name = dev["name"] if name in cache["devices"]: log.debug(" Duplicate device name '%s' at site '%s', keeping first", name, site) continue cache["devices"][name] = dev["id"] # Load interfaces for each device iface_count = 0 for dev_name, dev_id in cache["devices"].items(): ifaces = nb.get_interfaces(dev_id) for iface in ifaces: iface_type = iface["type"]["value"] if iface.get("type") else "other" cache["interfaces"][(dev_id, iface["name"])] = { "id": iface["id"], "type": iface_type, } iface_count += 1 log.info(" Loaded %d interfaces across %d devices", iface_count, len(cache["devices"])) # Load existing cables to find already-cabled interfaces cables = nb.get_cables() for cable in cables: for term in cable.get("a_terminations", []): obj = term.get("object") if obj: cache["cabled_ifaces"].add(obj["id"]) for term in cable.get("b_terminations", []): obj = term.get("object") if obj: cache["cabled_ifaces"].add(obj["id"]) log.info(" Found %d existing cables (%d cabled interfaces)", len(cables), len(cache["cabled_ifaces"])) return cache # --------------------------------------------------------------------------- # LLDP collection # --------------------------------------------------------------------------- def collect_lldp_from_device(device_cfg: dict) -> dict: """Connect to a single device and collect only LLDP neighbors. Returns NAPALM-format lldp_neighbors dict or empty dict on failure. """ host = device_cfg["host"] driver = device_cfg["driver"] username = device_cfg["username"] password = device_cfg["password"] secret = device_cfg.get("secret", "") timeout = device_cfg.get("timeout", 60) use_netmiko = driver in NETMIKO_ONLY_DRIVERS lldp = {} if use_netmiko: log.debug(" Using Netmiko fallback for %s (driver=%s)", host, driver) try: conn = connect_netmiko(host, driver, username, password, secret, timeout) lldp = _netmiko_parse_lldp(conn, driver) conn.disconnect() except Exception as exc: log.error(" Netmiko LLDP collection failed for %s: %s", host, exc) else: try: dev = connect_device(host, driver, username, password, secret, timeout) try: lldp = dev.get_lldp_neighbors_detail() except Exception as exc: log.warning(" NAPALM get_lldp_neighbors_detail() failed for %s: %s", host, exc) dev.close() except Exception as exc: log.error(" NAPALM connection failed for %s: %s", host, exc) total = sum(len(v) for v in lldp.values()) if total: log.info(" %s: %d LLDP neighbors", host, total) return lldp def collect_all_lldp(inventory: dict) -> dict: """Collect LLDP from all devices in inventory. Returns {device_hostname: lldp_neighbors_dict} where hostname is the inventory 'host' field (IP or name). We'll map to NetBox device names later. """ defaults = inventory.get("defaults", {}) all_lldp = {} devices = inventory["devices"] for entry in devices: cfg = merge_device_config(entry, defaults) host = cfg["host"] log.info("Collecting LLDP from %s (driver=%s)...", host, cfg["driver"]) lldp = collect_lldp_from_device(cfg) if lldp: # Store with the inventory host key — we'll resolve to NetBox name later all_lldp[host] = {"lldp": lldp, "config": cfg} return all_lldp # --------------------------------------------------------------------------- # LLDP → Cable matching # --------------------------------------------------------------------------- _ABBREV_TO_LONG = { "Gi": "GigabitEthernet", "Te": "TenGigabitEthernet", "Fa": "FastEthernet", "Et": "Ethernet", "Fo": "FortyGigabitEthernet", "Hu": "HundredGigE", "Lo": "Loopback", "Vl": "Vlan", "Po": "Port-channel", "Mg": "MgmtEth", "Tu": "Tunnel", "Se": "Serial", } _LONG_TO_ABBREV = {v: k for k, v in _ABBREV_TO_LONG.items()} def lookup_interface(iface_cache: dict, dev_id: int, iface_name: str) -> dict | None: """Look up an interface with fallback name variations. Tries multiple naming conventions: 1. Exact match 2. With/without space after type prefix 3. Abbreviated ↔ long form (Te ↔ TenGigabitEthernet) """ # Exact match result = iface_cache.get((dev_id, iface_name)) if result: return result m = re.match(r"^([A-Za-z]+)\s*(\d.*)$", iface_name) if not m: return None prefix, rest = m.group(1), m.group(2) # Try with/without space for fmt in [f"{prefix} {rest}", f"{prefix}{rest}"]: result = iface_cache.get((dev_id, fmt)) if result: return result # Try long form ↔ abbreviated form alt_prefix = _LONG_TO_ABBREV.get(prefix) or _ABBREV_TO_LONG.get(prefix) if alt_prefix: for fmt in [f"{alt_prefix} {rest}", f"{alt_prefix}{rest}"]: result = iface_cache.get((dev_id, fmt)) if result: return result return None def build_cable_list(all_lldp: dict, cache: dict, nb: NetBoxClient) -> list[dict]: """Build validated cable list from LLDP data + NetBox cache. Returns list of cable dicts: [{"a_device": str, "a_iface": str, "a_iface_id": int, "b_device": str, "b_iface": str, "b_iface_id": int, "skip_reason": str | None}] """ device_cache = cache["devices"] iface_cache = cache["interfaces"] cabled_ifaces = cache["cabled_ifaces"] # Build IP → device_name mapping by querying NetBox for primary IPs ip_to_device = _build_ip_to_device_map(nb, device_cache) # Build hostname → device_name (case-insensitive) for LLDP remote_system_name # Include both full names and short names (stripped FQDN) for matching name_lower = {} for name in device_cache: name_lower[name.lower()] = name # Also index short name (strip FQDN) if "." in name: short = name.split(".")[0].lower() if short not in name_lower: name_lower[short] = name seen_links = set() # for bidirectional dedup cables = [] stats = {"total_pairs": 0, "matched": 0, "skipped_device": 0, "skipped_iface": 0, "skipped_type": 0, "skipped_cabled": 0, "skipped_dup": 0} for host, data in all_lldp.items(): lldp = data["lldp"] # Resolve local device name from IP local_name = ip_to_device.get(host) if not local_name: log.warning(" %s: not found in NetBox, skipping all its neighbors", host) continue local_dev_id = device_cache[local_name] for local_iface_raw, neighbors in lldp.items(): for neighbor in neighbors: stats["total_pairs"] += 1 # -- Resolve remote device -- remote_sys = neighbor.get("remote_system_name", "").strip() # Strip FQDN if "." in remote_sys: remote_sys = remote_sys.split(".")[0] # Strip IOS-XR RP prefix if ":" in remote_sys: remote_sys = remote_sys.rsplit(":", 1)[-1] remote_name = name_lower.get(remote_sys.lower()) if not remote_name: # Try remote_system_description as fallback desc = neighbor.get("remote_system_description", "").strip() if desc: short = desc.split(".")[0] if "." in desc else desc remote_name = name_lower.get(short.lower()) if not remote_name: stats["skipped_device"] += 1 log.debug(" Skip: remote device '%s' not in NetBox", remote_sys) continue remote_dev_id = device_cache[remote_name] # -- Resolve local interface -- local_iface = normalize_interface_name(local_iface_raw.strip()) local_iface_info = lookup_interface(iface_cache, local_dev_id, local_iface) if not local_iface_info: stats["skipped_iface"] += 1 log.debug(" Skip: %s:%s not found in NetBox", local_name, local_iface) continue # -- Resolve remote interface -- remote_port_raw = (neighbor.get("remote_port", "") or neighbor.get("remote_port_description", "")).strip() remote_iface = normalize_interface_name(remote_port_raw) remote_iface_info = lookup_interface(iface_cache, remote_dev_id, remote_iface) if not remote_iface_info: stats["skipped_iface"] += 1 log.debug(" Skip: %s:%s not found in NetBox", remote_name, remote_iface) continue # -- Check interface types (no LAG / virtual) -- if local_iface_info["type"] in UNCABLEABLE_TYPES: stats["skipped_type"] += 1 log.debug(" Skip: %s:%s is type '%s' (uncableable)", local_name, local_iface, local_iface_info["type"]) continue if remote_iface_info["type"] in UNCABLEABLE_TYPES: stats["skipped_type"] += 1 log.debug(" Skip: %s:%s is type '%s' (uncableable)", remote_name, remote_iface, remote_iface_info["type"]) continue # -- Check if already cabled -- a_id = local_iface_info["id"] b_id = remote_iface_info["id"] if a_id in cabled_ifaces or b_id in cabled_ifaces: stats["skipped_cabled"] += 1 log.debug(" Skip: %s:%s or %s:%s already cabled", local_name, local_iface, remote_name, remote_iface) continue # -- Bidirectional dedup -- link_key = tuple(sorted([a_id, b_id])) if link_key in seen_links: stats["skipped_dup"] += 1 continue seen_links.add(link_key) cables.append({ "a_device": local_name, "a_iface": local_iface, "a_iface_id": a_id, "b_device": remote_name, "b_iface": remote_iface, "b_iface_id": b_id, }) stats["matched"] += 1 log.info("Cable matching summary:") log.info(" Total LLDP pairs: %d", stats["total_pairs"]) log.info(" Matched cables: %d", stats["matched"]) log.info(" Skipped (device not found): %d", stats["skipped_device"]) log.info(" Skipped (interface not found): %d", stats["skipped_iface"]) log.info(" Skipped (uncableable type): %d", stats["skipped_type"]) log.info(" Skipped (already cabled): %d", stats["skipped_cabled"]) log.info(" Skipped (bidirectional dup): %d", stats["skipped_dup"]) return cables def _build_ip_to_device_map(nb: NetBoxClient, device_cache: dict) -> dict: """Build {ip_address: device_name} mapping from NetBox interface IPs. This lets us map inventory hosts (which use IPs) to NetBox device names. """ ip_to_device = {} # Query all IP addresses and map to their assigned device ips = nb._get_all("/api/ipam/ip-addresses/", {"limit": 250}) for ip_entry in ips: addr = ip_entry.get("address", "") # "10.100.0.100/24" ip_only = addr.split("/")[0] if "/" in addr else addr assigned = ip_entry.get("assigned_object") if assigned and assigned.get("device"): dev_name = assigned["device"]["name"] if dev_name in device_cache: ip_to_device[ip_only] = dev_name log.info(" Built IP→device map: %d IPs mapped", len(ip_to_device)) return ip_to_device # --------------------------------------------------------------------------- # Output # --------------------------------------------------------------------------- def write_csv(cables: list[dict], output_path: str, site: str): """Write cables to CSV in NetBox bulk import format.""" with open(output_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow([ "side_a_device", "side_a_type", "side_a_name", "side_b_device", "side_b_type", "side_b_name", "side_a_site", "side_b_site", "status", "tags", ]) for cable in cables: writer.writerow([ cable["a_device"], "dcim.interface", cable["a_iface"], cable["b_device"], "dcim.interface", cable["b_iface"], site, site, "connected", "lldp-discovered", ]) log.info("CSV written to %s (%d cables)", output_path, len(cables)) def create_cables_via_api(nb: NetBoxClient, cables: list[dict], tag_id: int | None) -> int: """Create cables via NetBox REST API. Returns count of successfully created cables.""" created = 0 tag_ids = [tag_id] if tag_id else None for cable in cables: try: nb.create_cable(cable["a_iface_id"], cable["b_iface_id"], tag_ids=tag_ids) log.info(" Created: %s:%s <-> %s:%s", cable["a_device"], cable["a_iface"], cable["b_device"], cable["b_iface"]) created += 1 except requests.HTTPError as exc: body = "" try: body = exc.response.json() except Exception: body = exc.response.text[:200] log.error(" Failed: %s:%s <-> %s:%s — %s", cable["a_device"], cable["a_iface"], cable["b_device"], cable["b_iface"], body) return created # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="LLDP Cable Exporter for NetBox") parser.add_argument("-i", "--inventory", required=True, help="Path to inventory.yaml") parser.add_argument("--dry-run", action="store_true", help="Show cables without creating them") parser.add_argument("--csv-only", action="store_true", help="Only generate CSV, don't call API") parser.add_argument("--output", default="cables_export.csv", help="CSV output path") parser.add_argument("--site", default="main", help="NetBox site to target (default: main)") parser.add_argument("--log-level", default="INFO", help="Log level (DEBUG/INFO/WARNING)") parser.add_argument("--env-file", default=".env", help="Path to .env file") args = parser.parse_args() logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s %(name)s %(levelname)s %(message)s", ) # Load environment env_path = Path(args.env_file) if env_path.exists(): load_dotenv(env_path) else: log.warning("Env file %s not found, using environment variables", args.env_file) netbox_url = os.environ.get("NETBOX_API_URL") netbox_token = os.environ.get("NETBOX_API_TOKEN") if not netbox_url or not netbox_token: log.error("NETBOX_API_URL and NETBOX_API_TOKEN must be set in .env") sys.exit(1) # Load inventory inventory = load_inventory(args.inventory) device_count = len(inventory.get("devices", [])) log.info("Loaded %d devices from inventory", device_count) # Connect to NetBox API nb = NetBoxClient(netbox_url, netbox_token) log.info("Building NetBox inventory cache (site=%s)...", args.site) cache = build_netbox_cache(nb, args.site) # Collect LLDP from all devices log.info("Collecting LLDP neighbors from devices...") all_lldp = collect_all_lldp(inventory) connected = len(all_lldp) log.info("LLDP collected from %d/%d devices", connected, device_count) # Match and validate cables log.info("Matching LLDP data against NetBox inventory...") cables = build_cable_list(all_lldp, cache, nb) if not cables: log.info("No valid cables to create.") return # Output if args.dry_run: log.info("DRY RUN — %d cables would be created:", len(cables)) for c in cables: log.info(" %s:%s <-> %s:%s", c["a_device"], c["a_iface"], c["b_device"], c["b_iface"]) write_csv(cables, args.output, args.site) return # Write CSV write_csv(cables, args.output, args.site) if args.csv_only: log.info("CSV-only mode — skipping API import") return # Ensure tag exists tag = nb.ensure_tag("lldp-discovered") tag_id = tag["id"] # Create cables via API log.info("Creating %d cables via NetBox API...", len(cables)) created = create_cables_via_api(nb, cables, tag_id) log.info("Done! Created %d/%d cables.", created, len(cables)) if __name__ == "__main__": main()