netbox-diode-project/tests/test_ingestion.py
sam fbde598be3 Add ingestion test and fix OAuth2 scopes and bootstrap logic
- Add tests/test_ingestion.py for end-to-end Diode pipeline verification
- Fix OAuth2 client scopes: reconciler uses diode:reconcile, netbox-to-diode
  needs diode:read diode:write netbox:read netbox:write
- Rewrite bootstrap-clients.sh with upsert behavior (delete+recreate) so
  scope and secret changes are applied on restart
- Rewrite nginx.conf in setup.sh to match upstream auth_request architecture
- Update .claude/settings.json with expanded tool permissions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 01:55:37 -07:00

80 lines
2.3 KiB
Python

#!/usr/bin/env python3
"""
Minimal Diode ingestion test.
Pushes a test Site and Device through the Diode pipeline to verify
the full flow: SDK -> nginx -> ingester -> Redis -> reconciler -> NetBox
"""
import os
import sys
from netboxlabs.diode.sdk import DiodeClient
from netboxlabs.diode.sdk.ingester import (
Device,
DeviceRole,
DeviceType,
Entity,
Manufacturer,
Platform,
Site,
)
DIODE_TARGET = os.getenv("DIODE_TARGET", "grpc://localhost:8080/diode")
DIODE_CLIENT_ID = os.getenv("DIODE_CLIENT_ID", os.getenv("INGESTER_CLIENT_ID", "diode-ingester"))
DIODE_CLIENT_SECRET = os.getenv("DIODE_CLIENT_SECRET", os.getenv("INGESTER_CLIENT_SECRET"))
if not DIODE_CLIENT_SECRET:
print("ERROR: DIODE_CLIENT_SECRET must be set")
sys.exit(1)
def main():
print(f"Connecting to Diode at {DIODE_TARGET}...")
with DiodeClient(
target=DIODE_TARGET,
client_id=DIODE_CLIENT_ID,
client_secret=DIODE_CLIENT_SECRET,
app_name="diode-test",
app_version="0.0.1",
) as client:
entities = [
# Test Site
Entity(site=Site(
name="Test Site - Diode",
status="active",
description="Created by Diode ingestion test",
)),
# Test Device
Entity(device=Device(
name="test-device-diode-01",
device_type=DeviceType(
model="Test Model",
manufacturer=Manufacturer(name="Test Manufacturer"),
),
platform=Platform(name="Linux"),
site=Site(name="Test Site - Diode"),
role=DeviceRole(name="Test Role"),
status="active",
serial="DIODE-TEST-001",
comments="Created by Diode ingestion test",
)),
]
print(f"Ingesting {len(entities)} entities...")
response = client.ingest(entities=entities)
if response.errors:
print(f"ERRORS: {response.errors}")
sys.exit(1)
else:
print("Ingestion successful!")
print("Check NetBox for 'Test Site - Diode' and 'test-device-diode-01'")
print(f" -> The reconciler will process these shortly.")
if __name__ == "__main__":
main()