netbox-diode-project/collectors/docker_collector.py

359 lines
11 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Docker container collector for NetBox via Diode SDK.
Discovers Docker containers across one or more hosts and ingests them
into NetBox as VirtualMachines with VMInterfaces and IPAddresses.
Usage:
python collectors/docker_collector.py --dry-run
python collectors/docker_collector.py
"""
import argparse
import logging
import os
import sys
import docker
from netboxlabs.diode.sdk import DiodeClient, DiodeDryRunClient
from netboxlabs.diode.sdk.ingester import (
Cluster,
ClusterType,
CustomFieldValue,
Device,
DeviceRole,
DeviceType,
Entity,
IPAddress,
Manufacturer,
Platform,
Site,
VirtualMachine,
VMInterface,
)
log = logging.getLogger("docker-collector")
# ---------------------------------------------------------------------------
# Status mappings
# ---------------------------------------------------------------------------
CONTAINER_STATUS_MAP = {
"running": "active",
"paused": "active",
"restarting": "active",
"created": "planned",
"exited": "offline",
"dead": "failed",
"removing": "decommissioning",
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def load_dotenv(path: str = ".env") -> None:
if not os.path.isfile(path):
return
with open(path) as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip("\"'"))
def get_config() -> dict:
# Support multiple Docker hosts (comma-separated)
hosts_str = os.environ.get("DOCKER_HOSTS", "")
if hosts_str:
hosts = [h.strip() for h in hosts_str.split(",") if h.strip()]
else:
hosts = ["local"] # Use local Docker socket
return {
"hosts": hosts,
"site": os.environ.get("DOCKER_SITE", "main"),
"tls_verify": os.environ.get("DOCKER_TLS_VERIFY", "false").lower() == "true",
}
# ---------------------------------------------------------------------------
# VM reference helper
# ---------------------------------------------------------------------------
def _vm_ref(name: str, cluster_name: str, site_name: str) -> VirtualMachine:
return VirtualMachine(
name=name,
site=Site(name=site_name),
cluster=Cluster(
name=cluster_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Docker Container"),
)
# ---------------------------------------------------------------------------
# Data collection
# ---------------------------------------------------------------------------
def connect_docker(host: str, tls_verify: bool = False) -> docker.DockerClient:
"""Connect to a Docker host."""
if host == "local":
return docker.from_env()
else:
tls_config = None
if host.startswith("https://") and tls_verify:
tls_config = docker.tls.TLSConfig(verify=True)
return docker.DockerClient(base_url=host, tls=tls_config)
def get_host_info(client: docker.DockerClient) -> dict:
"""Get Docker host system info."""
return client.info()
def get_containers(client: docker.DockerClient, all_containers: bool = True) -> list:
"""Get all containers from Docker host."""
return client.containers.list(all=all_containers)
# ---------------------------------------------------------------------------
# Entity builders
# ---------------------------------------------------------------------------
def build_cluster_entity(host_name: str, site_name: str) -> Entity:
"""Build a Cluster entity for the Docker host."""
return Entity(cluster=Cluster(
name=host_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
status="active",
tags=["docker"],
))
def build_container_entities(container, host_name: str,
site_name: str) -> list[Entity]:
"""Build VirtualMachine + VMInterface + IPAddress entities for a container."""
entities = []
# Container info
name = container.name
status = CONTAINER_STATUS_MAP.get(container.status, "active")
image = container.image.tags[0] if container.image.tags else str(container.image.id)[:20]
short_id = container.short_id
# Labels/env for metadata
labels = container.labels or {}
compose_project = labels.get("com.docker.compose.project", "")
compose_service = labels.get("com.docker.compose.service", "")
# Custom fields
custom_fields = {
"docker_container_id": CustomFieldValue(text=short_id),
}
if compose_project:
custom_fields["docker_compose_project"] = CustomFieldValue(text=compose_project)
# VirtualMachine entity
vm_kwargs = dict(
name=name,
status=status,
site=Site(name=site_name),
cluster=Cluster(
name=host_name,
type=ClusterType(name="Docker"),
scope_site=Site(name=site_name),
),
role=DeviceRole(name="Docker Container"),
platform=Platform(name="Docker"),
comments=f"Image: {image}",
tags=["docker"],
custom_fields=custom_fields,
)
entities.append(Entity(virtual_machine=VirtualMachine(**vm_kwargs)))
# Network interfaces and IPs
vm_ref = _vm_ref(name, host_name, site_name)
try:
# container.attrs has full inspect data
networks = container.attrs.get("NetworkSettings", {}).get("Networks", {})
for net_name, net_data in networks.items():
ip = net_data.get("IPAddress", "")
mac = net_data.get("MacAddress", "")
gateway = net_data.get("Gateway", "")
prefix_len = net_data.get("IPPrefixLen", 0)
ipv6 = net_data.get("GlobalIPv6Address", "")
ipv6_prefix = net_data.get("GlobalIPv6PrefixLen", 0)
# VMInterface
iface_name = net_name[:64]
iface_kwargs = dict(
virtual_machine=vm_ref,
name=iface_name,
enabled=True,
tags=["docker"],
)
if mac:
iface_kwargs["mac_address"] = mac
entities.append(Entity(vm_interface=VMInterface(**iface_kwargs)))
# IPv4 address
if ip and ip != "0.0.0.0":
ip_str = f"{ip}/{prefix_len}" if prefix_len else f"{ip}/24"
entities.append(Entity(ip_address=IPAddress(
address=ip_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=iface_name,
),
tags=["docker"],
)))
# IPv6 address
if ipv6:
ipv6_str = f"{ipv6}/{ipv6_prefix}" if ipv6_prefix else f"{ipv6}/64"
entities.append(Entity(ip_address=IPAddress(
address=ipv6_str,
status="active",
assigned_object_vm_interface=VMInterface(
virtual_machine=vm_ref,
name=iface_name,
),
tags=["docker"],
)))
except Exception as exc:
log.warning(" Network info unavailable for %s: %s", name, exc)
return entities
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
def collect_all_entities(cfg: dict) -> list[Entity]:
site_name = cfg["site"]
entities: list[Entity] = []
for host in cfg["hosts"]:
log.info("Connecting to Docker host: %s", host)
try:
client = connect_docker(host, cfg["tls_verify"])
except Exception as exc:
log.error("Failed to connect to Docker host %s: %s", host, exc)
continue
# Get host info for cluster name
try:
info = get_host_info(client)
host_name = info.get("Name", host)
log.info(" Host: %s (Docker %s, %d containers)",
host_name, info.get("ServerVersion", "?"),
info.get("Containers", 0))
except Exception as exc:
log.warning(" Cannot get host info: %s", exc)
host_name = host
# Cluster entity
entities.append(build_cluster_entity(host_name, site_name))
# Container entities
try:
containers = get_containers(client)
log.info(" Found %d containers", len(containers))
for container in containers:
try:
entities.extend(build_container_entities(
container, host_name, site_name
))
except Exception as exc:
log.error(" Failed to process container %s: %s",
container.name, exc)
except Exception as exc:
log.error(" Failed to list containers on %s: %s", host_name, exc)
return entities
def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None:
if not entities:
log.warning("No entities to ingest")
return
target = os.environ.get("DIODE_TARGET", "grpc://localhost:8080/diode")
client_id = os.environ.get("DIODE_CLIENT_ID",
os.environ.get("INGESTER_CLIENT_ID", "diode-ingester"))
client_secret = os.environ.get("DIODE_CLIENT_SECRET",
os.environ.get("INGESTER_CLIENT_SECRET", ""))
if dry_run:
log.info("DRY RUN: %d entities would be ingested", len(entities))
for i, e in enumerate(entities):
log.info(" [%d] %s", i, e)
return
if not client_secret:
log.error("DIODE_CLIENT_SECRET not set — cannot ingest")
sys.exit(1)
log.info("Ingesting %d entities to %s ...", len(entities), target)
with DiodeClient(
target=target,
client_id=client_id,
client_secret=client_secret,
app_name="docker-collector",
app_version="0.1.0",
) as client:
resp = client.ingest(entities=entities)
if resp.errors:
log.error("Ingestion errors: %s", resp.errors)
else:
log.info("Ingestion successful")
def main():
parser = argparse.ArgumentParser(description="Docker container collector for NetBox")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--all", action="store_true",
help="Include stopped containers (default: running only)")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
parser.add_argument("--env-file", default=".env")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
load_dotenv(args.env_file)
cfg = get_config()
entities = collect_all_entities(cfg)
log.info("Total entities: %d", len(entities))
ingest_entities(entities, dry_run=args.dry_run)
log.info("Done!")
if __name__ == "__main__":
main()