From 8c37eca7add679e06a8fb19a20d06633d0059734 Mon Sep 17 00:00:00 2001 From: Naveen Modi Date: Mon, 21 Aug 2023 10:59:19 +0530 Subject: [PATCH] new changes --- blockchainetl/jobs/exporters/kafka_exporter.py | 11 +++++++++-- setup.py | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index ac15024..703c431 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -38,6 +38,12 @@ def get_connection_url(self, output): def open(self): pass + def acked(err, msg): + if err is not None: + logging.error("Failed to deliver message: %s: %s" % (str(msg), str(err))) + else: + logging.debug("Message produced: %s" % (str(msg))) + def export_items(self, items): for item in items: item_type = item.get('type') @@ -54,8 +60,9 @@ def export_items(self, items): def export_item(self, item, item_type): data = json.dumps(item).encode('utf-8') logging.debug(data) - return self.producer.produce(self.item_type_to_topic_mapping[item_type],key="0x0000",value=data) - + return self.producer.produce(self.item_type_to_topic_mapping[item_type],key="0x0000",value=data, callback=self.acked) + + def convert_items(self, items): for item in items: diff --git a/setup.py b/setup.py index ab0a20d..4600d22 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ def read(fname): 'streaming': [ 'timeout-decorator==0.4.1', 'google-cloud-pubsub==0.39.1', - 'kafka-python==2.0.2' + 'confluent-kafka==2.2.0' ], 'dev': [ 'pytest~=4.3.0'