#!/usr/bin/env python3 """Cisco Modeling Labs collector for NetBox via Diode SDK. Syncs CML lab topology into NetBox: devices (nodes), interfaces, cables (links), L3 addresses, and device configs. Usage: python collectors/cml_collector.py --dry-run python collectors/cml_collector.py """ import argparse import logging import os import re import sys from virl2_client import ClientLibrary from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient from netboxlabs.diode.sdk.ingester import ( Cable, Device, DeviceConfig, DeviceRole, DeviceType, Entity, GenericObject, Interface, IPAddress, Manufacturer, Platform, Site, ) log = logging.getLogger("cml-collector") # --------------------------------------------------------------------------- # CML node definition → NetBox mappings # --------------------------------------------------------------------------- NODE_DEF_TO_PLATFORM = { "iosv": "Cisco IOS", "iosvl2": "Cisco IOS", "iosxrv": "Cisco IOS-XR", "iosxrv9000": "Cisco IOS-XR", "csr1000v": "Cisco IOS-XE", "cat8000v": "Cisco IOS-XE", "cat9000v": "Cisco IOS-XE", "nxosv": "Cisco NX-OS", "nxosv9000": "Cisco NX-OS", "asav": "Cisco ASA", "server": "Linux", "ubuntu": "Ubuntu", "alpine": "Alpine Linux", "desktop": "Linux", "trex": "TRex Traffic Generator", "wan_emulator": "Linux", "external_connector": "External Connector", "unmanaged_switch": "Unmanaged Switch", } NODE_DEF_TO_MANUFACTURER = { "iosv": "Cisco", "iosvl2": "Cisco", "iosxrv": "Cisco", "iosxrv9000": "Cisco", "csr1000v": "Cisco", "cat8000v": "Cisco", "cat9000v": "Cisco", "nxosv": "Cisco", "nxosv9000": "Cisco", "asav": "Cisco", } NODE_DEF_TO_ROLE = { "iosv": "Router", "iosvl2": "Switch", "iosxrv": "Router", "iosxrv9000": "Router", "csr1000v": "Router", "cat8000v": "Router", "cat9000v": "Switch", "nxosv": "Switch", "nxosv9000": "Switch", "asav": "Firewall", "server": "Server", "ubuntu": "Server", "alpine": "Server", "desktop": "Server", "external_connector": "Patch Panel", "unmanaged_switch": "Switch", } NODE_DEF_TO_MODEL = { "iosv": "IOSv", "iosvl2": "IOSvL2", "iosxrv": "IOS-XRv", "iosxrv9000": "IOS-XRv 9000", "csr1000v": "CSR1000v", "cat8000v": "Catalyst 8000V", "cat9000v": "Catalyst 9000V", "nxosv": "NX-OSv", "nxosv9000": "NX-OSv 9000", "asav": "ASAv", } CML_STATE_TO_STATUS = { "BOOTED": "active", "STARTED": "active", "STOPPED": "offline", "DEFINED_ON_CORE": "planned", "QUEUED": "planned", } # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def load_dotenv(path: str = ".env") -> None: if not os.path.isfile(path): return with open(path) as fh: for line in fh: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, val = line.partition("=") os.environ.setdefault(key.strip(), val.strip().strip("\"'")) def get_config() -> dict: return { "host": os.environ.get("CML_HOST", ""), "user": os.environ.get("CML_USER", "admin"), "password": os.environ.get("CML_PASSWORD", ""), "lab": os.environ.get("CML_LAB", ""), "verify_ssl": os.environ.get("CML_VERIFY_SSL", "false").lower() == "true", "site": os.environ.get("CML_SITE", "CML"), } # --------------------------------------------------------------------------- # Device reference helper # --------------------------------------------------------------------------- def _device_ref(name: str, node_def: str, site_name: str) -> Device: model = NODE_DEF_TO_MODEL.get(node_def, node_def) manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic") role = NODE_DEF_TO_ROLE.get(node_def, "Network Device") return Device( name=name, device_type=DeviceType( model=model, manufacturer=Manufacturer(name=manufacturer), ), role=DeviceRole(name=role), site=Site(name=site_name), ) # --------------------------------------------------------------------------- # Entity builders # --------------------------------------------------------------------------- def build_node_entity(node, site_name: str) -> Entity: node_def = node.node_definition or "unknown" model = NODE_DEF_TO_MODEL.get(node_def, node_def) manufacturer = NODE_DEF_TO_MANUFACTURER.get(node_def, "Generic") role = NODE_DEF_TO_ROLE.get(node_def, "Network Device") platform = NODE_DEF_TO_PLATFORM.get(node_def, node_def) status = CML_STATE_TO_STATUS.get(node.state, "planned") return Entity(device=Device( name=node.label, device_type=DeviceType( model=model, manufacturer=Manufacturer(name=manufacturer), ), role=DeviceRole(name=role), platform=Platform(name=platform), site=Site(name=site_name), status=status, tags=["cml"], )) def build_interface_entities(node, site_name: str) -> list[Entity]: entities = [] node_def = node.node_definition or "unknown" for iface in node.interfaces(): iface_name = iface.label or f"iface{iface.slot}" # Map interface type from name iface_type = "virtual" if re.match(r"^(Gi|GigabitEthernet)", iface_name, re.IGNORECASE): iface_type = "1000base-t" elif re.match(r"^(Te|TenGig)", iface_name, re.IGNORECASE): iface_type = "10gbase-x-sfpp" elif re.match(r"^(Fa|FastEthernet)", iface_name, re.IGNORECASE): iface_type = "100base-tx" elif re.match(r"^(Lo|Loopback)", iface_name, re.IGNORECASE): iface_type = "virtual" elif re.match(r"^(eth|ens|enp)", iface_name, re.IGNORECASE): iface_type = "1000base-t" entities.append(Entity(interface=Interface( device=_device_ref(node.label, node_def, site_name), name=iface_name, type=iface_type, enabled=node.state in ("BOOTED", "STARTED"), tags=["cml"], ))) return entities def build_ip_entities(node, site_name: str) -> list[Entity]: """Build IP entities from CML node L3 addresses.""" entities = [] node_def = node.node_definition or "unknown" for iface in node.interfaces(): iface_name = iface.label or f"iface{iface.slot}" # CML may expose discovered L3 addresses try: if hasattr(iface, "discovered_ipv4") and iface.discovered_ipv4: for addr in iface.discovered_ipv4: if addr and not addr.startswith("127."): ip_str = addr if "/" in addr else f"{addr}/24" entities.append(Entity(ip_address=IPAddress( address=ip_str, status="active", assigned_object_interface=Interface( device=_device_ref(node.label, node_def, site_name), name=iface_name, type="virtual", ), tags=["cml"], ))) if hasattr(iface, "discovered_ipv6") and iface.discovered_ipv6: for addr in iface.discovered_ipv6: if addr and not addr.lower().startswith("fe80"): ip_str = addr if "/" in addr else f"{addr}/64" entities.append(Entity(ip_address=IPAddress( address=ip_str, status="active", assigned_object_interface=Interface( device=_device_ref(node.label, node_def, site_name), name=iface_name, type="virtual", ), tags=["cml"], ))) except Exception as exc: log.debug(" IP discovery unavailable for %s:%s: %s", node.label, iface_name, exc) return entities def build_cable_entities(lab, site_name: str, node_map: dict[str, str]) -> list[Entity]: """Build Cable entities from CML links. node_map: {node_id: (node_label, node_definition)} """ entities = [] for link in lab.links(): try: iface_a = link.interface_a iface_b = link.interface_b node_a = iface_a.node node_b = iface_b.node a_name = iface_a.label or f"iface{iface_a.slot}" b_name = iface_b.label or f"iface{iface_b.slot}" a_node_def = node_a.node_definition or "unknown" b_node_def = node_b.node_definition or "unknown" cable = Cable( a_terminations=[GenericObject(object_interface=Interface( device=_device_ref(node_a.label, a_node_def, site_name), name=a_name, type="virtual", ))], b_terminations=[GenericObject(object_interface=Interface( device=_device_ref(node_b.label, b_node_def, site_name), name=b_name, type="virtual", ))], status="connected", tags=["cml"], ) entities.append(Entity(cable=cable)) log.info(" Cable: %s:%s <-> %s:%s", node_a.label, a_name, node_b.label, b_name) except Exception as exc: log.warning(" Failed to build cable for link: %s", exc) return entities def build_config_entity(node, site_name: str) -> Entity | None: """Build DeviceConfig entity from CML node configuration.""" node_def = node.node_definition or "unknown" try: config = node.config if not config: return None return Entity( device=_device_ref(node.label, node_def, site_name), device_config=DeviceConfig( startup=config.encode("utf-8") if config else None, ), ) except Exception as exc: log.debug(" Config unavailable for %s: %s", node.label, exc) return None # --------------------------------------------------------------------------- # Orchestration # --------------------------------------------------------------------------- def collect_all_entities(cfg: dict) -> list[Entity]: host = cfg["host"] if not host: log.error("CML_HOST not set") sys.exit(1) url = f"https://{host}" if not host.startswith("http") else host log.info("Connecting to CML at %s...", url) client = ClientLibrary(url, cfg["user"], cfg["password"], ssl_verify=cfg["verify_ssl"]) site_name = cfg["site"] entities: list[Entity] = [] # Get labs labs = client.all_labs() target_lab = cfg.get("lab") if target_lab: labs = [l for l in labs if l.title == target_lab or l.id == target_lab] if not labs: log.error("Lab '%s' not found", target_lab) sys.exit(1) for lab in labs: lab.sync() log.info("Lab: %s (%s) — %d nodes, %d links", lab.title, lab.state(), len(lab.nodes()), len(lab.links())) node_map = {} for node in lab.nodes(): node_def = node.node_definition or "unknown" node_map[node.id] = (node.label, node_def) # Skip external connectors and unmanaged switches for device creation if node_def in ("external_connector", "unmanaged_switch"): log.debug(" Skipping non-device node: %s (%s)", node.label, node_def) # Still create interface entities for cable termination entities.extend(build_interface_entities(node, site_name)) continue # Device entities.append(build_node_entity(node, site_name)) # Interfaces entities.extend(build_interface_entities(node, site_name)) # IPs entities.extend(build_ip_entities(node, site_name)) # Config config_entity = build_config_entity(node, site_name) if config_entity: entities.append(config_entity) # Cables from links entities.extend(build_cable_entities(lab, site_name, node_map)) return entities def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: if not entities: log.warning("No entities to ingest") return target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") client_id = os.environ.get("DIODE_CLIENT_ID", os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) client_secret = os.environ.get("DIODE_CLIENT_SECRET", os.environ.get("INGESTER_CLIENT_SECRET", "")) if dry_run: log.info("DRY RUN: %d entities would be ingested", len(entities)) for i, e in enumerate(entities): log.info(" [%d] %s", i, e) return if not client_secret: log.error("DIODE_CLIENT_SECRET not set — cannot ingest") sys.exit(1) log.info("Ingesting %d entities to %s ...", len(entities), target) from netboxlabs.diode.sdk.ingester import create_message_chunks with DiodeClient( target=target, client_id=client_id, client_secret=client_secret, app_name="cml-collector", app_version="0.1.0", ) as client: chunks = create_message_chunks(entities) for idx, chunk in enumerate(chunks): resp = client.ingest(entities=chunk) if resp.errors: log.error("Chunk %d errors: %s", idx, resp.errors) else: log.info("Chunk %d: %d entities ingested", idx, len(chunk)) def main(): parser = argparse.ArgumentParser(description="CML topology collector for NetBox") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"]) parser.add_argument("--env-file", default=".env") args = parser.parse_args() logging.basicConfig( level=getattr(logging, args.log_level), format="%(asctime)s %(name)s %(levelname)s %(message)s", ) load_dotenv(args.env_file) cfg = get_config() entities = collect_all_entities(cfg) log.info("Total entities: %d", len(entities)) ingest_entities(entities, dry_run=args.dry_run) log.info("Done!") if __name__ == "__main__": main()