#!/usr/bin/env python3 """Zabbix collector for NetBox via Diode SDK. Pulls device inventory from Zabbix API for brownfield import into NetBox. Also adds Zabbix host IDs as custom fields for cross-referencing. Usage: python collectors/zabbix_collector.py --dry-run python collectors/zabbix_collector.py """ import argparse import logging import os import sys from pyzabbix import ZabbixAPI from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient from netboxlabs.diode.sdk.ingester import ( CustomFieldValue, Device, DeviceRole, DeviceType, Entity, Interface, IPAddress, Manufacturer, Platform, Site, ) log = logging.getLogger("zabbix-collector") # --------------------------------------------------------------------------- # Zabbix → NetBox mappings # --------------------------------------------------------------------------- ZABBIX_STATUS_MAP = { 0: "active", # Monitored 1: "offline", # Unmonitored } ZABBIX_IFACE_TYPE = { 1: "agent", # Zabbix agent 2: "snmp", # SNMP 3: "ipmi", # IPMI 4: "jmx", # JMX } # Best-effort OS → Platform mapping from Zabbix template groups OS_KEYWORDS_TO_PLATFORM = { "linux": "Linux", "windows": "Windows", "cisco": "Cisco IOS", "juniper": "Juniper Junos", "freebsd": "FreeBSD", "vmware": "VMware ESXi", "proxmox": "Proxmox VE", "ubuntu": "Ubuntu", "debian": "Debian", "centos": "CentOS", "rhel": "Red Hat Enterprise Linux", } # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def load_dotenv(path: str = ".env") -> None: if not os.path.isfile(path): return with open(path) as fh: for line in fh: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, val = line.partition("=") os.environ.setdefault(key.strip(), val.strip().strip("\"'")) def get_config() -> dict: return { "url": os.environ.get("ZABBIX_URL", ""), "user": os.environ.get("ZABBIX_USER", "Admin"), "password": os.environ.get("ZABBIX_PASSWORD", ""), "api_token": os.environ.get("ZABBIX_API_TOKEN", ""), "site": os.environ.get("ZABBIX_SITE", "main"), "default_role": os.environ.get("ZABBIX_DEFAULT_ROLE", "Server"), "group_to_role": {}, # Could be loaded from config } # --------------------------------------------------------------------------- # Device reference helper # --------------------------------------------------------------------------- def _device_ref(name: str, model: str, manufacturer: str, role: str, site_name: str) -> Device: return Device( name=name, device_type=DeviceType( model=model, manufacturer=Manufacturer(name=manufacturer), ), role=DeviceRole(name=role), site=Site(name=site_name), ) # --------------------------------------------------------------------------- # Data collection # --------------------------------------------------------------------------- def connect_zabbix(cfg: dict) -> ZabbixAPI: url = cfg["url"] if not url: log.error("ZABBIX_URL not set") sys.exit(1) zapi = ZabbixAPI(url) zapi.session.verify = False if cfg.get("api_token"): zapi.login(api_token=cfg["api_token"]) else: zapi.login(cfg["user"], cfg["password"]) log.info("Connected to Zabbix %s", zapi.api_version()) return zapi def collect_hosts(zapi: ZabbixAPI) -> list[dict]: """Fetch all hosts with their interfaces and inventory data.""" hosts = zapi.host.get( output=["hostid", "host", "name", "status", "description"], selectInterfaces=["interfaceid", "ip", "dns", "port", "type", "main", "useip"], selectInventory="extend", selectGroups=["name"], selectParentTemplates=["name"], ) log.info("Fetched %d hosts from Zabbix", len(hosts)) return hosts # --------------------------------------------------------------------------- # Entity builders # --------------------------------------------------------------------------- def guess_platform(host_data: dict) -> str | None: """Try to guess platform from Zabbix templates/groups/inventory.""" # Check inventory OS field inventory = host_data.get("inventory") or {} if isinstance(inventory, dict): os_full = inventory.get("os_full") or inventory.get("os_short") or "" for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items(): if keyword.lower() in os_full.lower(): return platform # Check template names templates = host_data.get("parentTemplates") or [] for tmpl in templates: tmpl_name = (tmpl.get("name") or "").lower() for keyword, platform in OS_KEYWORDS_TO_PLATFORM.items(): if keyword in tmpl_name: return platform return None def guess_role(host_data: dict, default_role: str) -> str: """Try to guess device role from Zabbix groups.""" groups = host_data.get("groups") or [] for group in groups: gname = (group.get("name") or "").lower() if "router" in gname: return "Router" if "switch" in gname: return "Switch" if "firewall" in gname: return "Firewall" if "hypervisor" in gname: return "Hypervisor" if "server" in gname or "linux" in gname or "windows" in gname: return "Server" return default_role def build_host_entities(host_data: dict, cfg: dict) -> list[Entity]: """Build Device + Interface + IPAddress entities from a Zabbix host.""" entities = [] site_name = cfg["site"] default_role = cfg["default_role"] hostname = host_data.get("name") or host_data.get("host", "unknown") status = ZABBIX_STATUS_MAP.get(int(host_data.get("status", 0)), "active") host_id = host_data.get("hostid", "") # Inventory data inventory = host_data.get("inventory") or {} if isinstance(inventory, list): inventory = {} serial = inventory.get("serialno_a") or "" model = inventory.get("model") or inventory.get("hardware") or "Unknown" vendor = inventory.get("vendor") or inventory.get("hardware_full") or "Unknown" asset_tag = inventory.get("asset_tag") or "" role = guess_role(host_data, default_role) platform = guess_platform(host_data) # Custom fields for cross-referencing custom_fields = {} if host_id: custom_fields["zabbix_host_id"] = CustomFieldValue(text=str(host_id)) # Device entity device_kwargs = dict( name=hostname, device_type=DeviceType( model=model, manufacturer=Manufacturer(name=vendor), ), role=DeviceRole(name=role), site=Site(name=site_name), status=status, serial=serial[:50] if serial else "", asset_tag=asset_tag[:50] if asset_tag else "", tags=["zabbix"], ) if platform: device_kwargs["platform"] = Platform(name=platform) if custom_fields: device_kwargs["custom_fields"] = custom_fields entities.append(Entity(device=Device(**device_kwargs))) # Interface + IP entities from Zabbix interfaces dev_ref = _device_ref(hostname, model, vendor, role, site_name) primary_ip = None for iface in host_data.get("interfaces", []): iface_type_num = int(iface.get("type", 1)) iface_type_name = ZABBIX_IFACE_TYPE.get(iface_type_num, "agent") ip = iface.get("ip", "") is_main = str(iface.get("main", "0")) == "1" if not ip or ip == "0.0.0.0": continue # Interface name based on type iface_name = f"zabbix-{iface_type_name}" if is_main and iface_type_name == "agent": iface_name = "mgmt0" elif iface_type_name == "snmp": iface_name = "snmp0" elif iface_type_name == "ipmi": iface_name = "ipmi0" entities.append(Entity(interface=Interface( device=dev_ref, name=iface_name, type="other", enabled=True, tags=["zabbix"], ))) # IP address ip_str = f"{ip}/32" # Zabbix doesn't provide prefix length entities.append(Entity(ip_address=IPAddress( address=ip_str, status="active", assigned_object_interface=Interface( device=dev_ref, name=iface_name, type="other", ), tags=["zabbix"], ))) if is_main and primary_ip is None: primary_ip = ip_str return entities # --------------------------------------------------------------------------- # Orchestration # --------------------------------------------------------------------------- def collect_all_entities(cfg: dict) -> list[Entity]: zapi = connect_zabbix(cfg) hosts = collect_hosts(zapi) entities: list[Entity] = [] for host in hosts: try: entities.extend(build_host_entities(host, cfg)) except Exception as exc: hostname = host.get("name") or host.get("host", "?") log.error("Failed to process Zabbix host %s: %s", hostname, exc) return entities def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: if not entities: log.warning("No entities to ingest") return target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode") client_id = os.environ.get("DIODE_CLIENT_ID", os.environ.get("INGESTER_CLIENT_ID", "diode-ingester")) client_secret = os.environ.get("DIODE_CLIENT_SECRET", os.environ.get("INGESTER_CLIENT_SECRET", "")) if dry_run: log.info("DRY RUN: %d entities would be ingested", len(entities)) for i, e in enumerate(entities): log.info(" [%d] %s", i, e) return if not client_secret: log.error("DIODE_CLIENT_SECRET not set — cannot ingest") sys.exit(1) log.info("Ingesting %d entities to %s ...", len(entities), target) with DiodeClient( target=target, client_id=client_id, client_secret=client_secret, app_name="zabbix-collector", app_version="0.1.0", ) as client: resp = client.ingest(entities=entities) if resp.errors: log.error("Ingestion errors: %s", resp.errors) else: log.info("Ingestion successful") def main(): parser = argparse.ArgumentParser(description="Zabbix collector for NetBox") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"]) parser.add_argument("--env-file", default=".env") args = parser.parse_args() logging.basicConfig( level=getattr(logging, args.log_level), format="%(asctime)s %(name)s %(levelname)s %(message)s", ) load_dotenv(args.env_file) cfg = get_config() entities = collect_all_entities(cfg) log.info("Total entities: %d", len(entities)) ingest_entities(entities, dry_run=args.dry_run) log.info("Done!") if __name__ == "__main__": main()