netbox-diode-project/collectors/pbs_collector.py
sam 5748bad765 Add PBS collector, multi-host PVE support, and collector fixes
- proxmox_collector: support numbered PVE_HOST_1/2/3 env vars with
  backward compat for legacy single PVE_HOST; fix MTU string-to-int cast
- pbs_collector: new collector for Proxmox Backup Server — discovers
  devices, interfaces, IPs, and datastores (as Services) via PBS API
- vmware_collector: fix mac_address → primary_mac_address for Diode SDK
- network_collector: add Netmiko SSH fallback for Brocade/NOS devices,
  add Brocade ICX interface type patterns
- unifi_collector: new collector for UniFi UDM-SE/switches/APs
- ENV_REFERENCE.md: document all collector env vars and setup steps
- .gitignore: exclude collectors/inventory.yaml (contains credentials)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:10:12 -07:00

465 lines
15 KiB
Python

#!/usr/bin/env python3
"""Proxmox Backup Server collector for NetBox via Diode SDK.
Discovers PBS hosts, network interfaces, IPs, and datastores
and ingests them into NetBox through the Diode pipeline.
Usage:
python collectors/pbs_collector.py --dry-run
python collectors/pbs_collector.py
"""
import argparse
import logging
import os
import sys
from proxmoxer import ProxmoxAPI
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Device,
DeviceRole,
DeviceType,
Entity,
Interface,
IPAddress,
Manufacturer,
Platform,
Service,
Site,
)
log = logging.getLogger("pbs-collector")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PBS_MANUFACTURER = "Proxmox Server Solutions GmbH"
PBS_MODEL = "Proxmox Backup Server"
IFACE_TYPE_MAP = {
"eth": "1000base-t",
"bond": "lag",
"bridge": "bridge",
"vlan": "virtual",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_diode_config() -> dict:
cfg = {
"diode_target": os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode"),
"client_id": os.getenv("INGESTER_CLIENT_ID", os.getenv("DIODE_CLIENT_ID", "diode-ingester")),
"client_secret": os.getenv("INGESTER_CLIENT_SECRET", os.getenv("DIODE_CLIENT_SECRET")),
}
if not cfg["client_secret"]:
log.error("Missing required env var: INGESTER_CLIENT_SECRET or DIODE_CLIENT_SECRET")
sys.exit(1)
return cfg
def get_pbs_hosts() -> list[dict]:
"""Build list of PBS host configs from numbered env vars.
Supports PBS_HOST_1/PBS_USER_1/... through PBS_HOST_N.
Falls back to legacy single PBS_HOST if no numbered vars exist.
"""
hosts = []
misses = 0
for i in range(1, 100):
host = os.getenv(f"PBS_HOST_{i}")
if host is None:
misses += 1
if misses >= 3:
break
continue
misses = 0
hosts.append({
"pbs_host": host,
"pbs_user": os.getenv(f"PBS_USER_{i}", os.getenv("PBS_USER", "root@pam")),
"pbs_token_name": os.getenv(f"PBS_TOKEN_NAME_{i}", os.getenv("PBS_TOKEN_NAME")),
"pbs_token_value": os.getenv(f"PBS_TOKEN_VALUE_{i}", os.getenv("PBS_TOKEN_VALUE")),
"pbs_verify_ssl": os.getenv(
f"PBS_VERIFY_SSL_{i}", os.getenv("PBS_VERIFY_SSL", "false")
).lower() in ("true", "1", "yes"),
"pbs_port": int(os.getenv(f"PBS_PORT_{i}", os.getenv("PBS_PORT", "8007"))),
"site_name": os.getenv(f"PBS_SITE_{i}", os.getenv("SITE_NAME", "main")),
})
# Backward compat with single PBS_HOST
legacy_host = os.getenv("PBS_HOST")
if legacy_host:
already_listed = any(h["pbs_host"] == legacy_host for h in hosts)
if not already_listed:
hosts.insert(0, {
"pbs_host": legacy_host,
"pbs_user": os.getenv("PBS_USER", "root@pam"),
"pbs_token_name": os.getenv("PBS_TOKEN_NAME"),
"pbs_token_value": os.getenv("PBS_TOKEN_VALUE"),
"pbs_verify_ssl": os.getenv("PBS_VERIFY_SSL", "false").lower() in ("true", "1", "yes"),
"pbs_port": int(os.getenv("PBS_PORT", "8007")),
"site_name": os.getenv("SITE_NAME", "main"),
})
if not hosts:
log.error("No PBS hosts configured (set PBS_HOST or PBS_HOST_1)")
sys.exit(1)
for i, h in enumerate(hosts):
missing = [k for k in ("pbs_host", "pbs_token_name", "pbs_token_value") if not h.get(k)]
if missing:
log.error("PBS host %d (%s): missing %s", i + 1, h.get("pbs_host", "?"), ", ".join(missing))
sys.exit(1)
return hosts
# ---------------------------------------------------------------------------
# PBS connection
# ---------------------------------------------------------------------------
def connect_pbs(config: dict) -> ProxmoxAPI:
"""Create and return a ProxmoxAPI connection to PBS."""
return ProxmoxAPI(
config["pbs_host"],
service="PBS",
port=config["pbs_port"],
user=config["pbs_user"],
token_name=config["pbs_token_name"],
token_value=config["pbs_token_value"],
verify_ssl=config["pbs_verify_ssl"],
backend="https",
)
# ---------------------------------------------------------------------------
# Data collection (pure PBS API calls)
# ---------------------------------------------------------------------------
def collect_node_status(pbs: ProxmoxAPI) -> dict:
return pbs.nodes("localhost").status.get()
def collect_node_networks(pbs: ProxmoxAPI) -> list[dict]:
return pbs.nodes("localhost").network.get()
def collect_datastores(pbs: ProxmoxAPI) -> list[dict]:
return pbs.admin.datastore.get()
def collect_datastore_usage(pbs: ProxmoxAPI) -> list[dict]:
try:
return pbs.status("datastore-usage").get()
except Exception as exc:
log.debug("Datastore usage endpoint unavailable: %s", exc)
return []
def collect_datastore_snapshots(pbs: ProxmoxAPI, store: str) -> list[dict]:
try:
return pbs.admin.datastore(store).snapshots.get()
except Exception as exc:
log.debug("Cannot get snapshots for %s: %s", store, exc)
return []
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _device_ref(hostname: str, site_name: str) -> Device:
return Device(
name=hostname,
device_type=DeviceType(
model=PBS_MODEL,
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
role=DeviceRole(name="Backup Server"),
site=Site(name=site_name),
)
def _map_interface_type(pve_type: str, iface_name: str) -> str:
if pve_type in IFACE_TYPE_MAP:
return IFACE_TYPE_MAP[pve_type]
if iface_name.startswith(("eno", "enp", "ens", "eth")):
return "1000base-t"
if iface_name.startswith("vmbr"):
return "bridge"
return "other"
def _netmask_to_prefix(mask: str) -> int:
try:
return sum(bin(int(o)).count("1") for o in mask.split("."))
except (ValueError, AttributeError):
return 24
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_device_entity(hostname: str, node_status: dict, site_name: str) -> Entity:
cpuinfo = node_status.get("cpuinfo", {})
memory = node_status.get("memory", {})
mem_gb = memory.get("total", 0) / (1024 ** 3)
info = node_status.get("info", {})
pbs_version = info.get("version", "")
kernel = node_status.get("kversion", "")
return Entity(device=Device(
name=hostname,
device_type=DeviceType(
model=PBS_MODEL,
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
role=DeviceRole(name="Backup Server"),
platform=Platform(
name="Proxmox Backup Server",
manufacturer=Manufacturer(name=PBS_MANUFACTURER),
),
site=Site(name=site_name),
status="active",
description=f"PBS {pbs_version}, kernel {kernel}" if pbs_version else "",
comments=f"CPU: {cpuinfo.get('model', 'N/A')}, "
f"Cores: {cpuinfo.get('cores', '?')}, "
f"Memory: {mem_gb:.1f} GB",
tags=["proxmox", "pbs"],
))
def build_interface_entities(hostname: str, interfaces: list[dict],
site_name: str) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
pve_type = iface.get("type", "")
desc_parts = []
if iface.get("bridge_ports"):
desc_parts.append(f"bridge_ports: {iface['bridge_ports']}")
if iface.get("comments"):
desc_parts.append(iface["comments"])
entities.append(Entity(interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
enabled=bool(iface.get("active", 0)),
mtu=iface.get("mtu"),
description=", ".join(desc_parts)[:200] if desc_parts else "",
tags=["proxmox", "pbs"],
)))
return entities
def build_ip_entities(hostname: str, interfaces: list[dict],
site_name: str) -> list[Entity]:
entities = []
for iface in interfaces:
name = iface.get("iface", "")
if name == "lo":
continue
cidr = iface.get("cidr")
address = iface.get("address")
netmask = iface.get("netmask")
if cidr:
ip_str = cidr
elif address and netmask:
ip_str = f"{address}/{_netmask_to_prefix(netmask)}"
elif address:
ip_str = f"{address}/24"
else:
continue
pve_type = iface.get("type", "")
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
),
tags=["proxmox", "pbs"],
)))
# IPv6
cidr6 = iface.get("cidr6")
if cidr6:
entities.append(Entity(ip_address=IPAddress(
address=cidr6,
status="active",
assigned_object_interface=Interface(
device=_device_ref(hostname, site_name),
name=name,
type=_map_interface_type(pve_type, name),
),
tags=["proxmox", "pbs"],
)))
return entities
def build_datastore_entities(hostname: str, datastores: list[dict],
usage_map: dict, site_name: str,
pbs: ProxmoxAPI) -> list[Entity]:
entities = []
for ds in datastores:
store_name = ds.get("name", ds.get("store", "unknown"))
usage = usage_map.get(store_name, {})
total = usage.get("total", 0)
used = usage.get("used", 0)
total_gb = total / (1024 ** 3) if total else 0
used_gb = used / (1024 ** 3) if used else 0
pct = (used / total * 100) if total else 0
path = ds.get("path", "N/A")
gc_schedule = ds.get("gc-schedule", "N/A")
# Count snapshots
try:
snapshots = collect_datastore_snapshots(pbs, store_name)
snap_count = len(snapshots)
except Exception:
snap_count = 0
log.info(" Datastore '%s': %.1f GB total, %.1f GB used (%.0f%%), %d snapshots",
store_name, total_gb, used_gb, pct, snap_count)
entities.append(Entity(service=Service(
device=_device_ref(hostname, site_name),
name=f"ds-{store_name}",
protocol="tcp",
ports=[8007],
description=f"PBS Datastore: {total_gb:.1f} GB total, {used_gb:.1f} GB used ({pct:.0f}%)",
comments=f"Path: {path}\nSnapshots: {snap_count}\nGC schedule: {gc_schedule}",
tags=["proxmox", "pbs", "datastore"],
)))
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(pbs: ProxmoxAPI, host_cfg: dict) -> list[Entity]:
site = host_cfg["site_name"]
entities: list[Entity] = []
node_status = collect_node_status(pbs)
hostname = node_status.get("hostname", host_cfg["pbs_host"])
log.info("PBS host: %s", hostname)
# Device
entities.append(build_device_entity(hostname, node_status, site))
# Interfaces and IPs
networks = collect_node_networks(pbs)
entities.extend(build_interface_entities(hostname, networks, site))
entities.extend(build_ip_entities(hostname, networks, site))
# Datastores as Services
datastores = collect_datastores(pbs)
log.info(" Datastores: %d", len(datastores))
usage_list = collect_datastore_usage(pbs)
usage_map = {u.get("store", u.get("name", "")): u for u in usage_list}
entities.extend(build_datastore_entities(hostname, datastores, usage_map, site, pbs))
return entities
def ingest_entities(entities: list[Entity], config: dict, dry_run: bool = False) -> None:
if dry_run:
client = DiodeDryRunClient(app_name="pbs-collector")
log.info("Dry-run mode: writing entities to stdout")
client.ingest(entities=entities)
return
with DiodeClient(
target=config["diode_target"],
client_id=config["client_id"],
client_secret=config["client_secret"],
app_name="pbs-collector",
app_version="0.1.0",
) as client:
log.info("Ingesting %d entities to %s ...", len(entities), config["diode_target"])
response = client.ingest(entities=entities)
if response.errors:
log.error("Ingestion errors: %s", response.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="Proxmox Backup Server collector for NetBox")
parser.add_argument("--dry-run", action="store_true", help="Output entities without ingesting")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env", help="Path to .env file")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(levelname)-8s %(message)s",
)
load_dotenv(args.env_file)
diode_config = get_diode_config()
pbs_hosts = get_pbs_hosts()
all_entities: list[Entity] = []
for i, host_cfg in enumerate(pbs_hosts, 1):
log.info("=== PBS Host %d/%d: %s ===", i, len(pbs_hosts), host_cfg["pbs_host"])
try:
pbs = connect_pbs(host_cfg)
entities = collect_all_entities(pbs, host_cfg)
log.info("Collected %d entities from %s", len(entities), host_cfg["pbs_host"])
all_entities.extend(entities)
except Exception:
log.exception("Failed to collect from PBS host %s", host_cfg["pbs_host"])
log.info("Total: %d entities from %d PBS host(s)", len(all_entities), len(pbs_hosts))
if not all_entities:
log.warning("No entities collected. Exiting.")
return
ingest_entities(all_entities, diode_config, dry_run=args.dry_run)
log.info("Done.")
if __name__ == "__main__":
main()