- Add tests/test_ingestion.py for end-to-end Diode pipeline verification - Fix OAuth2 client scopes: reconciler uses diode:reconcile, netbox-to-diode needs diode:read diode:write netbox:read netbox:write - Rewrite bootstrap-clients.sh with upsert behavior (delete+recreate) so scope and secret changes are applied on restart - Rewrite nginx.conf in setup.sh to match upstream auth_request architecture - Update .claude/settings.json with expanded tool permissions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Minimal Diode ingestion test.
|
|
|
|
Pushes a test Site and Device through the Diode pipeline to verify
|
|
the full flow: SDK -> nginx -> ingester -> Redis -> reconciler -> NetBox
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
|
|
from netboxlabs.diode.sdk import DiodeClient
|
|
from netboxlabs.diode.sdk.ingester import (
|
|
Device,
|
|
DeviceRole,
|
|
DeviceType,
|
|
Entity,
|
|
Manufacturer,
|
|
Platform,
|
|
Site,
|
|
)
|
|
|
|
DIODE_TARGET = os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode")
|
|
DIODE_CLIENT_ID = os.getenv("DIODE_CLIENT_ID", os.getenv("INGESTER_CLIENT_ID", "diode-ingester"))
|
|
DIODE_CLIENT_SECRET = os.getenv("DIODE_CLIENT_SECRET", os.getenv("INGESTER_CLIENT_SECRET"))
|
|
|
|
if not DIODE_CLIENT_SECRET:
|
|
print("ERROR: DIODE_CLIENT_SECRET must be set")
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
print(f"Connecting to Diode at {DIODE_TARGET}...")
|
|
|
|
with DiodeClient(
|
|
target=DIODE_TARGET,
|
|
client_id=DIODE_CLIENT_ID,
|
|
client_secret=DIODE_CLIENT_SECRET,
|
|
app_name="diode-test",
|
|
app_version="0.0.1",
|
|
) as client:
|
|
entities = [
|
|
# Test Site
|
|
Entity(site=Site(
|
|
name="Test Site - Diode",
|
|
status="active",
|
|
description="Created by Diode ingestion test",
|
|
)),
|
|
|
|
# Test Device
|
|
Entity(device=Device(
|
|
name="test-device-diode-01",
|
|
device_type=DeviceType(
|
|
model="Test Model",
|
|
manufacturer=Manufacturer(name="Test Manufacturer"),
|
|
),
|
|
platform=Platform(name="Linux"),
|
|
site=Site(name="Test Site - Diode"),
|
|
role=DeviceRole(name="Test Role"),
|
|
status="active",
|
|
serial="DIODE-TEST-001",
|
|
comments="Created by Diode ingestion test",
|
|
)),
|
|
]
|
|
|
|
print(f"Ingesting {len(entities)} entities...")
|
|
response = client.ingest(entities=entities)
|
|
|
|
if response.errors:
|
|
print(f"ERRORS: {response.errors}")
|
|
sys.exit(1)
|
|
else:
|
|
print("Ingestion successful!")
|
|
print("Check NetBox for 'Test Site - Diode' and 'test-device-diode-01'")
|
|
print(f" -> The reconciler will process these shortly.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|