From 37b92c166a41dc132985b4ca47e48a4133b228a3 Mon Sep 17 00:00:00 2001 From: sam Date: Sat, 28 Feb 2026 16:49:25 -0700 Subject: [PATCH] Fix create_message_chunks import error across all collectors The installed Diode SDK version does not export create_message_chunks. Replace chunked ingestion with direct client.ingest() calls. Co-Authored-By: Claude Opus 4.6 --- collectors/cml_collector.py | 14 +++++--------- collectors/docker_collector.py | 14 +++++--------- collectors/network_collector.py | 14 +++++--------- collectors/observium_collector.py | 14 +++++--------- collectors/unifi_collector.py | 14 +++++--------- collectors/vmware_collector.py | 14 +++++--------- collectors/zabbix_collector.py | 14 +++++--------- 7 files changed, 35 insertions(+), 63 deletions(-) diff --git a/collectors/cml_collector.py b/collectors/cml_collector.py index faf956b..f9d78a3 100644 --- a/collectors/cml_collector.py +++ b/collectors/cml_collector.py @@ -415,8 +415,6 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: log.info("Ingesting %d entities to %s ...", len(entities), target) - from netboxlabs.diode.sdk.ingester import create_message_chunks - with DiodeClient( target=target, client_id=client_id, @@ -424,13 +422,11 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: app_name="cml-collector", app_version="0.1.0", ) as client: - chunks = create_message_chunks(entities) - for idx, chunk in enumerate(chunks): - resp = client.ingest(entities=chunk) - if resp.errors: - log.error("Chunk %d errors: %s", idx, resp.errors) - else: - log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + resp = client.ingest(entities=entities) + if resp.errors: + log.error("Ingestion errors: %s", resp.errors) + else: + log.info("Ingestion successful") def main(): diff --git a/collectors/docker_collector.py b/collectors/docker_collector.py index 55bd38e..ebe2d98 100644 --- a/collectors/docker_collector.py +++ b/collectors/docker_collector.py @@ -315,8 +315,6 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: log.info("Ingesting %d entities to %s ...", len(entities), target) - from netboxlabs.diode.sdk.ingester import create_message_chunks - with DiodeClient( target=target, client_id=client_id, @@ -324,13 +322,11 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: app_name="docker-collector", app_version="0.1.0", ) as client: - chunks = create_message_chunks(entities) - for idx, chunk in enumerate(chunks): - resp = client.ingest(entities=chunk) - if resp.errors: - log.error("Chunk %d errors: %s", idx, resp.errors) - else: - log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + resp = client.ingest(entities=entities) + if resp.errors: + log.error("Ingestion errors: %s", resp.errors) + else: + log.info("Ingestion successful") def main(): diff --git a/collectors/network_collector.py b/collectors/network_collector.py index ffa75c0..1591a3d 100644 --- a/collectors/network_collector.py +++ b/collectors/network_collector.py @@ -1608,8 +1608,6 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: log.info("Ingesting %d entities to %s ...", len(entities), target) - from netboxlabs.diode.sdk.ingester import create_message_chunks - with DiodeClient( target=target, client_id=client_id, @@ -1617,13 +1615,11 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: app_name="network-collector", app_version="0.1.0", ) as client: - chunks = create_message_chunks(entities) - for idx, chunk in enumerate(chunks): - resp = client.ingest(entities=chunk) - if resp.errors: - log.error("Chunk %d errors: %s", idx, resp.errors) - else: - log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + resp = client.ingest(entities=entities) + if resp.errors: + log.error("Ingestion errors: %s", resp.errors) + else: + log.info("Ingestion successful") # --------------------------------------------------------------------------- diff --git a/collectors/observium_collector.py b/collectors/observium_collector.py index 47d7930..f3ff2bc 100644 --- a/collectors/observium_collector.py +++ b/collectors/observium_collector.py @@ -337,8 +337,6 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: log.info("Ingesting %d entities to %s ...", len(entities), target) - from netboxlabs.diode.sdk.ingester import create_message_chunks - with DiodeClient( target=target, client_id=client_id, @@ -346,13 +344,11 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: app_name="observium-collector", app_version="0.1.0", ) as client: - chunks = create_message_chunks(entities) - for idx, chunk in enumerate(chunks): - resp = client.ingest(entities=chunk) - if resp.errors: - log.error("Chunk %d errors: %s", idx, resp.errors) - else: - log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + resp = client.ingest(entities=entities) + if resp.errors: + log.error("Ingestion errors: %s", resp.errors) + else: + log.info("Ingestion successful") def main(): diff --git a/collectors/unifi_collector.py b/collectors/unifi_collector.py index 7c12298..75496d0 100644 --- a/collectors/unifi_collector.py +++ b/collectors/unifi_collector.py @@ -713,8 +713,6 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: log.info("Ingesting %d entities to %s ...", len(entities), target) - from netboxlabs.diode.sdk.ingester import create_message_chunks - with DiodeClient( target=target, client_id=client_id, @@ -722,13 +720,11 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: app_name="unifi-collector", app_version="0.1.0", ) as client: - chunks = create_message_chunks(entities) - for idx, chunk in enumerate(chunks): - resp = client.ingest(entities=chunk) - if resp.errors: - log.error("Chunk %d errors: %s", idx, resp.errors) - else: - log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + resp = client.ingest(entities=entities) + if resp.errors: + log.error("Ingestion errors: %s", resp.errors) + else: + log.info("Ingestion successful") def main(): diff --git a/collectors/vmware_collector.py b/collectors/vmware_collector.py index 874756c..0c57a81 100644 --- a/collectors/vmware_collector.py +++ b/collectors/vmware_collector.py @@ -503,8 +503,6 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: log.info("Ingesting %d entities to %s ...", len(entities), target) - from netboxlabs.diode.sdk.ingester import create_message_chunks - with DiodeClient( target=target, client_id=client_id, @@ -512,13 +510,11 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: app_name="vmware-collector", app_version="0.1.0", ) as client: - chunks = create_message_chunks(entities) - for idx, chunk in enumerate(chunks): - resp = client.ingest(entities=chunk) - if resp.errors: - log.error("Chunk %d errors: %s", idx, resp.errors) - else: - log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + resp = client.ingest(entities=entities) + if resp.errors: + log.error("Ingestion errors: %s", resp.errors) + else: + log.info("Ingestion successful") def main(): diff --git a/collectors/zabbix_collector.py b/collectors/zabbix_collector.py index cd82734..b706659 100644 --- a/collectors/zabbix_collector.py +++ b/collectors/zabbix_collector.py @@ -331,8 +331,6 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: log.info("Ingesting %d entities to %s ...", len(entities), target) - from netboxlabs.diode.sdk.ingester import create_message_chunks - with DiodeClient( target=target, client_id=client_id, @@ -340,13 +338,11 @@ def ingest_entities(entities: list[Entity], dry_run: bool = False) -> None: app_name="zabbix-collector", app_version="0.1.0", ) as client: - chunks = create_message_chunks(entities) - for idx, chunk in enumerate(chunks): - resp = client.ingest(entities=chunk) - if resp.errors: - log.error("Chunk %d errors: %s", idx, resp.errors) - else: - log.info("Chunk %d: %d entities ingested", idx, len(chunk)) + resp = client.ingest(entities=entities) + if resp.errors: + log.error("Ingestion errors: %s", resp.errors) + else: + log.info("Ingestion successful") def main():