#!/usr/bin/env python3 """Proxmox VE collector for NetBox via Diode SDK. Discovers nodes, QEMU VMs, LXC containers, interfaces, IPs, and disks from a Proxmox VE host and ingests them into NetBox through the Diode pipeline. """ import argparse import logging import os import re import sys from proxmoxer import ProxmoxAPI from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient from netboxlabs.diode.sdk.ingester import ( Cluster, ClusterType, Device, DeviceRole, DeviceType, Entity, Interface, IPAddress, Manufacturer, Platform, Site, VirtualDisk, VirtualMachine, VMInterface, ) log = logging.getLogger("proxmox-collector") # --------------------------------------------------------------------------- # Status / type mapping tables # --------------------------------------------------------------------------- PVE_TO_NETBOX_STATUS = { "running": "active", "stopped": "offline", "paused": "offline", "suspended": "offline", "unknown": "planned", } PVE_OSTYPE_MAP = { # QEMU "l26": "Linux", "l24": "Linux", "win10": "Windows 10/Server 2016+", "win11": "Windows 11/Server 2022+", "win8": "Windows 8/Server 2012", "win7": "Windows 7/Server 2008 R2", "wxp": "Windows XP", "w2k": "Windows 2000", "solaris": "Solaris", "other": "Other", # LXC "debian": "Debian", "ubuntu": "Ubuntu", "centos": "CentOS", "fedora": "Fedora", "opensuse": "openSUSE", "archlinux": "Arch Linux", "alpine": "Alpine Linux", "gentoo": "Gentoo", "nixos": "NixOS", "unmanaged": "Unmanaged", } PVE_IFACE_TYPE_MAP = { "eth": "1000base-t", "bond": "lag", "bridge": "bridge", "vlan": "virtual", "OVSBridge": "bridge", "OVSBond": "lag", "OVSPort": "virtual", "OVSIntPort": "virtual", "veth": "virtual", "lo": "virtual", } QEMU_DISK_RE = re.compile(r"^(scsi|virtio|ide|sata|efidisk)\d+$") LXC_DISK_RE = re.compile(r"^(rootfs|mp\d+)$") # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def load_dotenv(path: str = ".env") -> None: """Load key=value pairs from a .env file into os.environ.""" if not os.path.isfile(path): return with open(path) as fh: for line in fh: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, _, value = line.partition("=") key = key.strip() value = value.strip().strip("\"'") os.environ.setdefault(key, value) def get_config() -> dict: """Read and validate configuration from environment variables.""" cfg = { "pve_host": os.getenv("PVE_HOST"), "pve_user": os.getenv("PVE_USER", "root@pam"), "pve_token_name": os.getenv("PVE_TOKEN_NAME"), "pve_token_value": os.getenv("PVE_TOKEN_VALUE"), "pve_verify_ssl": os.getenv("PVE_VERIFY_SSL", "false").lower() in ("true", "1", "yes"), "pve_port": int(os.getenv("PVE_PORT", "8006")), "diode_target": os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode"), "client_id": os.getenv("INGESTER_CLIENT_ID", os.getenv("DIODE_CLIENT_ID", "diode-ingester")), "client_secret": os.getenv("INGESTER_CLIENT_SECRET", os.getenv("DIODE_CLIENT_SECRET")), "site_name": os.getenv("SITE_NAME", "main"), } missing = [k for k in ("pve_host", "pve_token_name", "pve_token_value", "client_secret") if not cfg[k]] if missing: log.error("Missing required env vars: %s", ", ".join(missing)) sys.exit(1) return cfg # --------------------------------------------------------------------------- # PVE connection # --------------------------------------------------------------------------- def connect_pve(config: dict) -> ProxmoxAPI: """Create and return a ProxmoxAPI connection.""" return ProxmoxAPI( config["pve_host"], port=config["pve_port"], user=config["pve_user"], token_name=config["pve_token_name"], token_value=config["pve_token_value"], verify_ssl=config["pve_verify_ssl"], backend="https", ) # --------------------------------------------------------------------------- # Data collection (pure PVE API calls) # --------------------------------------------------------------------------- def collect_node_info(prox: ProxmoxAPI, node: str) -> dict: return prox.nodes(node).status.get() def collect_node_networks(prox: ProxmoxAPI, node: str) -> list[dict]: return prox.nodes(node).network.get() def collect_qemu_vms(prox: ProxmoxAPI, node: str) -> list[dict]: return prox.nodes(node).qemu.get() def collect_vm_config(prox: ProxmoxAPI, node: str, vmid: int) -> dict: return prox.nodes(node).qemu(vmid).config.get() def collect_vm_guest_agent_ips(prox: ProxmoxAPI, node: str, vmid: int) -> list[dict] | None: try: resp = prox.nodes(node).qemu(vmid).agent("network-get-interfaces").get() return resp.get("result", resp) if isinstance(resp, dict) else resp except Exception as exc: log.debug("Guest agent unavailable for VM %s: %s", vmid, exc) return None def collect_lxc_interfaces(prox: ProxmoxAPI, node: str, vmid: int) -> list[dict] | None: """Get runtime network interfaces for a running LXC container (includes DHCP IPs).""" try: resp = prox.nodes(node).lxc(vmid).interfaces.get() return resp if resp else None except Exception as exc: log.debug("Interfaces unavailable for LXC %s: %s", vmid, exc) return None def collect_lxc_containers(prox: ProxmoxAPI, node: str) -> list[dict]: return prox.nodes(node).lxc.get() def collect_lxc_config(prox: ProxmoxAPI, node: str, vmid: int) -> dict: return prox.nodes(node).lxc(vmid).config.get() # --------------------------------------------------------------------------- # Parsing helpers # --------------------------------------------------------------------------- def parse_pve_net_config(raw: str) -> dict: """Parse QEMU net config: 'virtio=AA:BB:CC:DD:EE:FF,bridge=vmbr0,firewall=1'.""" result = {} parts = raw.split(",") for part in parts: if "=" not in part: continue k, v = part.split("=", 1) result[k] = v # The first key=value is model=mac (e.g. virtio=AA:BB:...) for k, v in result.items(): if re.match(r"^[0-9A-Fa-f]{2}(:[0-9A-Fa-f]{2}){5}$", v): result["model"] = k result["mac"] = v break return result def parse_lxc_net_config(raw: str) -> dict: """Parse LXC net config: 'name=eth0,bridge=vmbr0,hwaddr=...,ip=10.0.0.5/24'.""" result = {} for part in raw.split(","): if "=" not in part: continue k, v = part.split("=", 1) result[k] = v return result def parse_disk_size(size_str: str) -> int: """Parse '32G', '512M', '1T' to integer GB.""" m = re.match(r"(\d+(?:\.\d+)?)\s*([GMTK]?)", size_str, re.IGNORECASE) if not m: return 0 value = float(m.group(1)) unit = m.group(2).upper() if unit == "T": return int(value * 1024) if unit in ("G", ""): return int(value) if unit == "M": return max(1, int(value / 1024)) if unit == "K": return max(1, int(value / (1024 * 1024))) return int(value) def parse_pve_disk_config(raw: str) -> dict: """Parse PVE disk config: 'local-lvm:vm-100-disk-0,size=32G'.""" result = {"storage": "", "volume": "", "size_gb": 0} parts = raw.split(",") # First part is storage:volume if parts: sv = parts[0] if ":" in sv: result["storage"], result["volume"] = sv.split(":", 1) else: result["volume"] = sv # Find size= for part in parts: if part.startswith("size="): result["size_gb"] = parse_disk_size(part[5:]) break return result # --------------------------------------------------------------------------- # Mapping helpers # --------------------------------------------------------------------------- def map_pve_status(pve_status: str) -> str: return PVE_TO_NETBOX_STATUS.get(pve_status.lower(), "active") def map_pve_interface_type(pve_type: str, iface_name: str) -> str: if pve_type in PVE_IFACE_TYPE_MAP: return PVE_IFACE_TYPE_MAP[pve_type] if iface_name.startswith(("eno", "enp", "ens", "eth")): return "1000base-t" if iface_name.startswith("vmbr"): return "bridge" return "other" def map_ostype(ostype: str) -> str: return PVE_OSTYPE_MAP.get(ostype, ostype or "Other") def build_mac_to_netkey_map(vm_config: dict) -> dict[str, str]: """Build MAC address -> PVE net key mapping (e.g. 'AA:BB:...' -> 'net0').""" mac_map = {} for key, value in vm_config.items(): if not re.match(r"^net\d+$", key): continue parsed = parse_pve_net_config(str(value)) mac = parsed.get("mac", "").lower() if mac: mac_map[mac] = key return mac_map def sum_disk_sizes(vm_config: dict, vm_type: str) -> int: """Sum all disk sizes from a VM/LXC config, return total in GB.""" pattern = QEMU_DISK_RE if vm_type == "qemu" else LXC_DISK_RE total = 0 for key, value in vm_config.items(): if not pattern.match(key): continue raw = str(value) if "media=cdrom" in raw or raw.startswith("none"): continue disk = parse_pve_disk_config(raw) total += disk["size_gb"] return total # --------------------------------------------------------------------------- # Entity builders # --------------------------------------------------------------------------- def build_cluster_entity(node_name: str, site_name: str) -> Entity: return Entity(cluster=Cluster( name=node_name, type=ClusterType(name="Proxmox VE"), scope_site=Site(name=site_name), status="active", tags=["proxmox"], )) def build_node_device_entity(node_name: str, node_status: dict, site_name: str) -> Entity: cpuinfo = node_status.get("cpuinfo", {}) pve_version = node_status.get("pveversion", "") kernel = node_status.get("kversion", "") mem_gb = node_status.get("memory", {}).get("total", 0) / (1024 ** 3) return Entity(device=Device( name=node_name, device_type=DeviceType( model="Proxmox VE Host", manufacturer=Manufacturer(name="Proxmox Server Solutions GmbH"), ), role=DeviceRole(name="Hypervisor"), platform=Platform( name="Proxmox VE", manufacturer=Manufacturer(name="Proxmox Server Solutions GmbH"), ), site=Site(name=site_name), status="active", description=f"PVE {pve_version}, kernel {kernel}", comments=f"CPU: {cpuinfo.get('model', 'N/A')}, " f"Sockets: {cpuinfo.get('sockets', '?')}, " f"Cores: {cpuinfo.get('cores', '?')}, " f"Threads: {cpuinfo.get('cpus', '?')}, " f"Memory: {mem_gb:.1f} GB", tags=["proxmox", "hypervisor"], )) def build_node_interface_entities( node_name: str, interfaces: list[dict], site_name: str, ) -> list[Entity]: entities = [] for iface in interfaces: name = iface.get("iface", "") if name == "lo": continue pve_type = iface.get("type", "") entities.append(Entity(interface=Interface( device=Device(name=node_name, site=Site(name=site_name)), name=name, type=map_pve_interface_type(pve_type, name), enabled=bool(iface.get("active", 0)), mtu=iface.get("mtu"), description=_iface_description(iface), tags=["proxmox"], ))) return entities def _iface_description(iface: dict) -> str: parts = [] if iface.get("bridge_ports"): parts.append(f"bridge_ports: {iface['bridge_ports']}") if iface.get("bond_slaves") or iface.get("slaves"): parts.append(f"slaves: {iface.get('bond_slaves') or iface.get('slaves')}") if iface.get("comments"): parts.append(iface["comments"]) return ", ".join(parts) if parts else "" def build_node_ip_entities( node_name: str, interfaces: list[dict], site_name: str, ) -> list[Entity]: entities = [] for iface in interfaces: name = iface.get("iface", "") if name == "lo": continue cidr = iface.get("cidr") address = iface.get("address") netmask = iface.get("netmask") if cidr: ip_str = cidr elif address and netmask: ip_str = f"{address}/{_netmask_to_prefix(netmask)}" else: continue pve_type = iface.get("type", "") entities.append(Entity(ip_address=IPAddress( address=ip_str, status="active", assigned_object_interface=Interface( device=Device( name=node_name, device_type=DeviceType( model="Proxmox VE Host", manufacturer=Manufacturer(name="Proxmox Server Solutions GmbH"), ), role=DeviceRole(name="Hypervisor"), site=Site(name=site_name), ), name=name, type=map_pve_interface_type(pve_type, name), ), tags=["proxmox"], ))) return entities def _netmask_to_prefix(mask: str) -> int: try: return sum(bin(int(o)).count("1") for o in mask.split(".")) except (ValueError, AttributeError): return 24 def _first_ipv4(ip_entities: list[Entity]) -> str | None: """Extract the first IPv4 address (with prefix) from a list of IPAddress entities.""" for ent in ip_entities: addr = ent.ip_address.address if addr and ":" not in addr: # skip IPv6 return addr return None def _vm_ref(vm_name: str, node_name: str, site_name: str, role_name: str) -> VirtualMachine: """Build a rich VirtualMachine reference with enough context for the reconciler.""" return VirtualMachine( name=vm_name, site=Site(name=site_name), cluster=Cluster( name=node_name, type=ClusterType(name="Proxmox VE"), scope_site=Site(name=site_name), ), role=DeviceRole(name=role_name), ) def build_vm_entity( vm_data: dict, vm_config: dict, node_name: str, site_name: str, vm_type: str, primary_ip4: str | None = None, ) -> Entity: vm_name = vm_config.get("hostname") or vm_data.get("name") or f"vm-{vm_data['vmid']}" memory_mb = int(vm_config.get("memory", 0)) if vm_type == "qemu": vcpus = int(vm_config.get("cores", 1)) * int(vm_config.get("sockets", 1)) role_name = "Virtual Machine" tags = ["proxmox", "qemu"] else: vcpus = int(vm_config.get("cores", 1)) role_name = "LXC Container" tags = ["proxmox", "lxc"] ostype = vm_config.get("ostype", "other") disk_gb = sum_disk_sizes(vm_config, vm_type) vm_kwargs = dict( name=vm_name, status=map_pve_status(vm_data.get("status", "unknown")), site=Site(name=site_name), cluster=Cluster( name=node_name, type=ClusterType(name="Proxmox VE"), scope_site=Site(name=site_name), ), role=DeviceRole(name=role_name), platform=Platform(name=map_ostype(ostype)), vcpus=float(vcpus), memory=memory_mb, disk=disk_gb, description=f"VMID: {vm_data['vmid']}", comments=vm_config.get("description", ""), tags=tags, ) if primary_ip4: vm_kwargs["primary_ip4"] = IPAddress(address=primary_ip4) return Entity(virtual_machine=VirtualMachine(**vm_kwargs)) def build_vm_interface_entities( vm_name: str, vm_config: dict, vm_type: str, node_name: str = "", site_name: str = "", ) -> list[Entity]: role_name = "Virtual Machine" if vm_type == "qemu" else "LXC Container" entities = [] for key, value in sorted(vm_config.items()): if not re.match(r"^net\d+$", key): continue raw = str(value) if vm_type == "lxc": parsed = parse_lxc_net_config(raw) name = parsed.get("name", key) else: parsed = parse_pve_net_config(raw) name = key bridge = parsed.get("bridge", "N/A") model = parsed.get("model", "veth") entities.append(Entity(vm_interface=VMInterface( virtual_machine=_vm_ref(vm_name, node_name, site_name, role_name), name=name, enabled=True, description=f"bridge={bridge}, model={model}", tags=["proxmox"], ))) return entities def build_vm_disk_entities( vm_name: str, vm_config: dict, vm_type: str, node_name: str = "", site_name: str = "", ) -> list[Entity]: role_name = "Virtual Machine" if vm_type == "qemu" else "LXC Container" pattern = QEMU_DISK_RE if vm_type == "qemu" else LXC_DISK_RE entities = [] for key, value in sorted(vm_config.items()): if not pattern.match(key): continue raw = str(value) if "media=cdrom" in raw or raw.startswith("none"): continue disk = parse_pve_disk_config(raw) if disk["size_gb"] == 0: continue entities.append(Entity(virtual_disk=VirtualDisk( virtual_machine=_vm_ref(vm_name, node_name, site_name, role_name), name=key, size=disk["size_gb"], description=f"{disk['storage']}:{disk['volume']}", tags=["proxmox"], ))) return entities def build_vm_ip_entities_from_guest_agent( vm_name: str, agent_ifaces: list[dict], mac_map: dict[str, str], node_name: str = "", site_name: str = "", ) -> list[Entity]: entities = [] for ga_iface in agent_ifaces: ga_name = ga_iface.get("name", "unknown") mac = ga_iface.get("hardware-address", "").lower() pve_key = mac_map.get(mac, ga_name) for ip_info in ga_iface.get("ip-addresses", []): addr = ip_info.get("ip-address", "") prefix = ip_info.get("prefix", 24) if not addr: continue if addr.startswith("127.") or addr == "::1" or addr.startswith("fe80::"): continue entities.append(Entity(ip_address=IPAddress( address=f"{addr}/{prefix}", status="active", assigned_object_vm_interface=VMInterface( virtual_machine=_vm_ref(vm_name, node_name, site_name, "Virtual Machine"), name=pve_key, ), tags=["proxmox", "guest-agent"], ))) return entities def build_lxc_ip_entities( vm_name: str, vm_config: dict, runtime_ifaces: list[dict] | None = None, node_name: str = "", site_name: str = "", ) -> list[Entity]: """Build IP entities for LXC from static config and/or runtime interfaces.""" entities = [] vm = _vm_ref(vm_name, node_name, site_name, "LXC Container") # First try runtime interfaces (covers DHCP and static) if runtime_ifaces: for iface in runtime_ifaces: iface_name = iface.get("name", "unknown") if iface_name == "lo": continue for ip_info in iface.get("ip-addresses", []): addr = ip_info.get("ip-address", "") prefix = ip_info.get("prefix", "24") if not addr: continue if addr.startswith("127.") or addr == "::1" or addr.startswith("fe80::"): continue entities.append(Entity(ip_address=IPAddress( address=f"{addr}/{prefix}", status="active", assigned_object_vm_interface=VMInterface( virtual_machine=vm, name=iface_name, ), tags=["proxmox", "lxc"], ))) return entities # Fallback: static IPs from config (for stopped containers) for key, value in sorted(vm_config.items()): if not re.match(r"^net\d+$", key): continue parsed = parse_lxc_net_config(str(value)) iface_name = parsed.get("name", key) for ip_field in ("ip", "ip6"): ip_val = parsed.get(ip_field, "") if not ip_val or ip_val in ("dhcp", "auto", "manual"): continue entities.append(Entity(ip_address=IPAddress( address=ip_val, status="active", assigned_object_vm_interface=VMInterface( virtual_machine=vm, name=iface_name, ), tags=["proxmox", "lxc"], ))) return entities # --------------------------------------------------------------------------- # Orchestration # --------------------------------------------------------------------------- def collect_all_entities(prox: ProxmoxAPI, config: dict) -> list[Entity]: site = config["site_name"] entities: list[Entity] = [] nodes = prox.nodes.get() log.info("Found %d node(s)", len(nodes)) for node_data in nodes: node_name = node_data["node"] log.info("Processing node: %s", node_name) # Cluster entities.append(build_cluster_entity(node_name, site)) # Node device node_status = collect_node_info(prox, node_name) entities.append(build_node_device_entity(node_name, node_status, site)) # Node interfaces + IPs node_nets = collect_node_networks(prox, node_name) entities.extend(build_node_interface_entities(node_name, node_nets, site)) entities.extend(build_node_ip_entities(node_name, node_nets, site)) # QEMU VMs qemu_vms = collect_qemu_vms(prox, node_name) log.info(" QEMU VMs: %d", len(qemu_vms)) for vm in qemu_vms: vmid = vm["vmid"] try: vm_cfg = collect_vm_config(prox, node_name, vmid) vm_name = vm_cfg.get("hostname") or vm.get("name") or f"vm-{vmid}" log.info(" VM %s (VMID %s)", vm_name, vmid) # Collect IPs first so we can set primary_ip4 mac_map = build_mac_to_netkey_map(vm_cfg) agent_data = collect_vm_guest_agent_ips(prox, node_name, vmid) ip_entities = [] if agent_data: ip_entities = build_vm_ip_entities_from_guest_agent( vm_name, agent_data, mac_map, node_name, site) log.info(" Guest agent IPs collected") else: log.debug(" No guest agent for VM %s", vmid) primary_ip4 = _first_ipv4(ip_entities) entities.append(build_vm_entity(vm, vm_cfg, node_name, site, "qemu", primary_ip4)) entities.extend(build_vm_interface_entities(vm_name, vm_cfg, "qemu", node_name, site)) entities.extend(build_vm_disk_entities(vm_name, vm_cfg, "qemu", node_name, site)) entities.extend(ip_entities) except Exception: log.exception("Failed to process QEMU VM %s", vmid) # LXC containers lxc_cts = collect_lxc_containers(prox, node_name) log.info(" LXC containers: %d", len(lxc_cts)) for ct in lxc_cts: vmid = ct["vmid"] try: ct_cfg = collect_lxc_config(prox, node_name, vmid) ct_name = ct_cfg.get("hostname") or ct.get("name") or f"ct-{vmid}" log.info(" CT %s (VMID %s)", ct_name, vmid) # Collect IPs first so we can set primary_ip4 lxc_ifaces = collect_lxc_interfaces(prox, node_name, vmid) ip_entities = build_lxc_ip_entities(ct_name, ct_cfg, lxc_ifaces, node_name, site) primary_ip4 = _first_ipv4(ip_entities) entities.append(build_vm_entity(ct, ct_cfg, node_name, site, "lxc", primary_ip4)) entities.extend(build_vm_interface_entities(ct_name, ct_cfg, "lxc", node_name, site)) entities.extend(build_vm_disk_entities(ct_name, ct_cfg, "lxc", node_name, site)) entities.extend(ip_entities) except Exception: log.exception("Failed to process LXC container %s", vmid) return entities def ingest_entities(entities: list[Entity], config: dict, dry_run: bool = False) -> None: if dry_run: client = DiodeDryRunClient(app_name="proxmox-collector") log.info("Dry-run mode: writing entities to stdout") client.ingest(entities=entities) return with DiodeClient( target=config["diode_target"], client_id=config["client_id"], client_secret=config["client_secret"], app_name="proxmox-collector", app_version="0.1.0", ) as client: log.info("Ingesting %d entities to %s ...", len(entities), config["diode_target"]) response = client.ingest(entities=entities) if response.errors: log.error("Ingestion errors: %s", response.errors) else: log.info("Ingestion successful") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Proxmox VE collector for NetBox via Diode") parser.add_argument("--dry-run", action="store_true", help="Output entities as JSON without ingesting") parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"]) parser.add_argument("--env-file", default=".env", help="Path to .env file") args = parser.parse_args() logging.basicConfig( level=getattr(logging, args.log_level), format="%(asctime)s %(levelname)-8s %(message)s", ) load_dotenv(args.env_file) config = get_config() log.info("Connecting to PVE at %s:%s ...", config["pve_host"], config["pve_port"]) prox = connect_pve(config) log.info("Collecting entities from Proxmox VE...") entities = collect_all_entities(prox, config) log.info("Collected %d entities total", len(entities)) if not entities: log.warning("No entities collected. Exiting.") return ingest_entities(entities, config, dry_run=args.dry_run) log.info("Done.") if __name__ == "__main__": main()