netbox-diode-project/tests/test_ingestion.py

80 lines
2.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Minimal Diode ingestion test.
Pushes a test Site and Device through the Diode pipeline to verify
the full flow: SDK -> nginx -> ingester -> Redis -> reconciler -> NetBox
"""
import os
import sys
from netboxlabs.diode.sdk import DiodeClient
from netboxlabs.diode.sdk.ingester import (
Device,
DeviceRole,
DeviceType,
Entity,
Manufacturer,
Platform,
Site,
)
DIODE_TARGET = os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode")
DIODE_CLIENT_ID = os.getenv("DIODE_CLIENT_ID", os.getenv("INGESTER_CLIENT_ID", "diode-ingester"))
DIODE_CLIENT_SECRET = os.getenv("DIODE_CLIENT_SECRET", os.getenv("INGESTER_CLIENT_SECRET"))
if not DIODE_CLIENT_SECRET:
print("ERROR: DIODE_CLIENT_SECRET must be set")
sys.exit(1)
def main():
print(f"Connecting to Diode at {DIODE_TARGET}...")
with DiodeClient(
target=DIODE_TARGET,
client_id=DIODE_CLIENT_ID,
client_secret=DIODE_CLIENT_SECRET,
app_name="diode-test",
app_version="0.0.1",
) as client:
entities = [
# Test Site
Entity(site=Site(
name="Test Site - Diode",
status="active",
description="Created by Diode ingestion test",
)),
# Test Device
Entity(device=Device(
name="test-device-diode-01",
device_type=DeviceType(
model="Test Model",
manufacturer=Manufacturer(name="Test Manufacturer"),
),
platform=Platform(name="Linux"),
site=Site(name="Test Site - Diode"),
role=DeviceRole(name="Test Role"),
status="active",
serial="DIODE-TEST-001",
comments="Created by Diode ingestion test",
)),
]
print(f"Ingesting {len(entities)} entities...")
response = client.ingest(entities=entities)
if response.errors:
print(f"ERRORS: {response.errors}")
sys.exit(1)
else:
print("Ingestion successful!")
print("Check NetBox for 'Test Site - Diode' and 'test-device-diode-01'")
print(f" -> The reconciler will process these shortly.")
if __name__ == "__main__":
main()