The installed Diode SDK version does not export create_message_chunks. Replace chunked ingestion with direct client.ingest() calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
379 lines
12 KiB
Python
379 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Observium collector for NetBox via Diode SDK.
|
|
|
|
Pulls device, port, and IP data from the Observium REST API for brownfield
|
|
import into NetBox. Adds Observium device IDs as custom fields for cross-referencing.
|
|
|
|
Note: Observium API requires Professional or Enterprise edition.
|
|
Community Edition users can skip this collector.
|
|
|
|
Usage:
|
|
python collectors/observium_collector.py --dry-run
|
|
python collectors/observium_collector.py
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
|
|
from netboxlabs.diode.sdk.ingester import (
|
|
CustomFieldValue,
|
|
Device,
|
|
DeviceRole,
|
|
DeviceType,
|
|
Entity,
|
|
Interface,
|
|
IPAddress,
|
|
Manufacturer,
|
|
Platform,
|
|
Site,
|
|
VLAN,
|
|
)
|
|
|
|
log = logging.getLogger("observium-collector")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Observium → NetBox mappings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
OBSERVIUM_STATUS_MAP = {
|
|
"1": "active", # up
|
|
"0": "offline", # down
|
|
}
|
|
|
|
IF_TYPE_MAP = {
|
|
"ethernetCsmacd": "1000base-t",
|
|
"gigabitEthernet": "1000base-t",
|
|
"fastEthernet": "100base-tx",
|
|
"propVirtual": "virtual",
|
|
"l3ipvlan": "virtual",
|
|
"tunnel": "virtual",
|
|
"softwareLoopback": "virtual",
|
|
"ieee8023adLag": "lag",
|
|
"bridge": "bridge",
|
|
"other": "other",
|
|
}
|
|
|
|
OS_TO_PLATFORM = {
|
|
"ios": "Cisco IOS",
|
|
"iosxe": "Cisco IOS-XE",
|
|
"iosxr": "Cisco IOS-XR",
|
|
"nxos": "Cisco NX-OS",
|
|
"junos": "Juniper Junos",
|
|
"linux": "Linux",
|
|
"vmware": "VMware ESXi",
|
|
"ironware": "Brocade IronWare",
|
|
"fastiron": "Brocade FastIron",
|
|
"windows": "Windows",
|
|
"freebsd": "FreeBSD",
|
|
"proxmox": "Proxmox VE",
|
|
}
|
|
|
|
OS_TO_MANUFACTURER = {
|
|
"ios": "Cisco",
|
|
"iosxe": "Cisco",
|
|
"iosxr": "Cisco",
|
|
"nxos": "Cisco",
|
|
"junos": "Juniper",
|
|
"ironware": "Brocade",
|
|
"fastiron": "Brocade",
|
|
}
|
|
|
|
OS_TO_ROLE = {
|
|
"ios": "Router",
|
|
"iosxe": "Router",
|
|
"iosxr": "Router",
|
|
"nxos": "Switch",
|
|
"junos": "Router",
|
|
"ironware": "Switch",
|
|
"fastiron": "Switch",
|
|
"linux": "Server",
|
|
"vmware": "Hypervisor",
|
|
"windows": "Server",
|
|
"proxmox": "Hypervisor",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def load_dotenv(path: str = ".env") -> None:
|
|
if not os.path.isfile(path):
|
|
return
|
|
with open(path) as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, val = line.partition("=")
|
|
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
|
|
|
|
|
|
def get_config() -> dict:
|
|
return {
|
|
"url": os.environ.get("OBSERVIUM_URL", ""),
|
|
"user": os.environ.get("OBSERVIUM_USER", "admin"),
|
|
"password": os.environ.get("OBSERVIUM_PASSWORD", ""),
|
|
"site": os.environ.get("OBSERVIUM_SITE", "main"),
|
|
"default_role": os.environ.get("OBSERVIUM_DEFAULT_ROLE", "Network Device"),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def api_get(base_url: str, endpoint: str, auth: HTTPBasicAuth,
|
|
params: dict | None = None) -> dict:
|
|
"""Make a GET request to the Observium API."""
|
|
url = f"{base_url}/{endpoint}"
|
|
resp = requests.get(url, auth=auth, params=params, timeout=30, verify=False)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Device reference helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _device_ref(name: str, model: str, manufacturer: str, role: str,
|
|
site_name: str) -> Device:
|
|
return Device(
|
|
name=name,
|
|
device_type=DeviceType(
|
|
model=model,
|
|
manufacturer=Manufacturer(name=manufacturer),
|
|
),
|
|
role=DeviceRole(name=role),
|
|
site=Site(name=site_name),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data collection and entity building
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def collect_all_entities(cfg: dict) -> list[Entity]:
|
|
base_url = cfg["url"].rstrip("/")
|
|
if not base_url:
|
|
log.error("OBSERVIUM_URL not set")
|
|
sys.exit(1)
|
|
|
|
auth = HTTPBasicAuth(cfg["user"], cfg["password"])
|
|
site_name = cfg["site"]
|
|
default_role = cfg["default_role"]
|
|
entities: list[Entity] = []
|
|
|
|
# --- Devices ---
|
|
log.info("Fetching devices from Observium...")
|
|
try:
|
|
devices_resp = api_get(base_url, "devices", auth)
|
|
except Exception as exc:
|
|
log.error("Failed to fetch devices: %s", exc)
|
|
sys.exit(1)
|
|
|
|
devices = devices_resp.get("devices", {})
|
|
if isinstance(devices, list):
|
|
devices = {str(i): d for i, d in enumerate(devices)}
|
|
log.info("Found %d devices", len(devices))
|
|
|
|
device_id_to_name = {}
|
|
|
|
for dev_id, dev_data in devices.items():
|
|
hostname = dev_data.get("hostname") or dev_data.get("sysName") or f"device-{dev_id}"
|
|
device_id_to_name[dev_id] = hostname
|
|
|
|
os_type = dev_data.get("os") or ""
|
|
model = dev_data.get("hardware") or "Unknown"
|
|
vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown")
|
|
serial = dev_data.get("serial") or ""
|
|
status = "active" if dev_data.get("status") == "1" else "offline"
|
|
role = OS_TO_ROLE.get(os_type, default_role)
|
|
platform = OS_TO_PLATFORM.get(os_type)
|
|
|
|
custom_fields = {
|
|
"observium_device_id": CustomFieldValue(text=str(dev_id)),
|
|
}
|
|
|
|
device_kwargs = dict(
|
|
name=hostname,
|
|
device_type=DeviceType(
|
|
model=model,
|
|
manufacturer=Manufacturer(name=vendor),
|
|
),
|
|
role=DeviceRole(name=role),
|
|
site=Site(name=site_name),
|
|
serial=serial[:50] if serial else "",
|
|
status=status,
|
|
tags=["observium"],
|
|
custom_fields=custom_fields,
|
|
)
|
|
if platform:
|
|
device_kwargs["platform"] = Platform(name=platform)
|
|
|
|
entities.append(Entity(device=Device(**device_kwargs)))
|
|
|
|
# --- Ports (interfaces) ---
|
|
log.info("Fetching ports from Observium...")
|
|
try:
|
|
ports_resp = api_get(base_url, "ports", auth)
|
|
ports = ports_resp.get("ports", {})
|
|
if isinstance(ports, list):
|
|
ports = {str(i): p for i, p in enumerate(ports)}
|
|
log.info("Found %d ports", len(ports))
|
|
except Exception as exc:
|
|
log.warning("Failed to fetch ports: %s", exc)
|
|
ports = {}
|
|
|
|
port_id_to_info = {}
|
|
|
|
for port_id, port_data in ports.items():
|
|
dev_id = str(port_data.get("device_id", ""))
|
|
hostname = device_id_to_name.get(dev_id)
|
|
if not hostname:
|
|
continue
|
|
|
|
iface_name = port_data.get("ifName") or port_data.get("port_label") or f"port{port_id}"
|
|
if_type = port_data.get("ifType") or "other"
|
|
mac = port_data.get("ifPhysAddress") or ""
|
|
speed = int(port_data.get("ifSpeed", 0) or 0) // 1000000 # bps → Mbps
|
|
mtu = int(port_data.get("ifMtu", 0) or 0)
|
|
description = port_data.get("ifAlias") or ""
|
|
enabled = port_data.get("ifAdminStatus") == "up"
|
|
|
|
iface_type = IF_TYPE_MAP.get(if_type, "other")
|
|
if iface_type == "other" and speed:
|
|
from collectors.network_collector import SPEED_TO_TYPE
|
|
iface_type = SPEED_TO_TYPE.get(speed, "other")
|
|
|
|
# Look up device model for reference
|
|
dev_data = devices.get(dev_id, {})
|
|
os_type = dev_data.get("os") or ""
|
|
model = dev_data.get("hardware") or "Unknown"
|
|
vendor = dev_data.get("vendor") or OS_TO_MANUFACTURER.get(os_type, "Unknown")
|
|
role = OS_TO_ROLE.get(os_type, default_role)
|
|
|
|
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
|
|
|
|
entities.append(Entity(interface=Interface(
|
|
device=dev_ref,
|
|
name=iface_name[:64],
|
|
type=iface_type,
|
|
mac_address=mac,
|
|
mtu=mtu,
|
|
speed=speed * 1000 if speed else 0, # Mbps → Kbps
|
|
enabled=enabled,
|
|
description=description[:200] if description else "",
|
|
tags=["observium"],
|
|
)))
|
|
|
|
port_id_to_info[port_id] = (hostname, iface_name, iface_type, model, vendor, role)
|
|
|
|
# --- IP addresses ---
|
|
log.info("Fetching IP addresses from Observium...")
|
|
try:
|
|
# Observium may expose IPs through the ports/addresses endpoint
|
|
for port_id, (hostname, iface_name, iface_type, model, vendor, role) in port_id_to_info.items():
|
|
try:
|
|
addr_resp = api_get(base_url, f"ports/{port_id}/ip", auth)
|
|
addresses = addr_resp.get("addresses", {})
|
|
if isinstance(addresses, list):
|
|
addresses = {str(i): a for i, a in enumerate(addresses)}
|
|
|
|
for addr_id, addr_data in addresses.items():
|
|
ip = addr_data.get("ipv4_address") or addr_data.get("ipv6_address") or ""
|
|
prefix_len = addr_data.get("ipv4_prefixlen") or addr_data.get("ipv6_prefixlen") or ""
|
|
if ip and prefix_len:
|
|
ip_str = f"{ip}/{prefix_len}"
|
|
dev_ref = _device_ref(hostname, model, vendor, role, site_name)
|
|
entities.append(Entity(ip_address=IPAddress(
|
|
address=ip_str,
|
|
status="active",
|
|
assigned_object_interface=Interface(
|
|
device=dev_ref,
|
|
name=iface_name,
|
|
type=iface_type,
|
|
),
|
|
tags=["observium"],
|
|
)))
|
|
except Exception:
|
|
pass # IP endpoint may not be available for all ports
|
|
except Exception as exc:
|
|
log.warning("IP address collection failed: %s", exc)
|
|
|
|
return entities
|
|
|
|
|
|
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
|
|
if not entities:
|
|
log.warning("No entities to ingest")
|
|
return
|
|
|
|
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
|
|
client_id = os.environ.get("DIODE_CLIENT_ID",
|
|
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
|
|
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
|
|
os.environ.get("INGESTER_CLIENT_SECRET", ""))
|
|
|
|
if dry_run:
|
|
log.info("DRY RUN: %d entities would be ingested", len(entities))
|
|
for i, e in enumerate(entities):
|
|
log.info(" [%d] %s", i, e)
|
|
return
|
|
|
|
if not client_secret:
|
|
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
|
|
sys.exit(1)
|
|
|
|
log.info("Ingesting %d entities to %s ...", len(entities), target)
|
|
|
|
with DiodeClient(
|
|
target=target,
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
app_name="observium-collector",
|
|
app_version="0.1.0",
|
|
) as client:
|
|
resp = client.ingest(entities=entities)
|
|
if resp.errors:
|
|
log.error("Ingestion errors: %s", resp.errors)
|
|
else:
|
|
log.info("Ingestion successful")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Observium collector for NetBox")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--log-level", default="INFO",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
|
|
parser.add_argument("--env-file", default=".env")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=getattr(logging, args.log_level),
|
|
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
|
)
|
|
|
|
load_dotenv(args.env_file)
|
|
cfg = get_config()
|
|
|
|
entities = collect_all_entities(cfg)
|
|
log.info("Total entities: %d", len(entities))
|
|
|
|
ingest_entities(entities, dry_run=args.dry_run)
|
|
log.info("Done!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|