Skip to content

Commit

Permalink
new changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Naveen Modi authored and Naveen Modi committed Aug 21, 2023
1 parent 7b128dc commit 8c37eca
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
11 changes: 9 additions & 2 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ def get_connection_url(self, output):
def open(self):
pass

def acked(err, msg):
if err is not None:
logging.error("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
logging.debug("Message produced: %s" % (str(msg)))

def export_items(self, items):
for item in items:
item_type = item.get('type')
Expand All @@ -54,8 +60,9 @@ def export_items(self, items):
def export_item(self, item, item_type):
data = json.dumps(item).encode('utf-8')
logging.debug(data)
return self.producer.produce(self.item_type_to_topic_mapping[item_type],key="0x0000",value=data)

return self.producer.produce(self.item_type_to_topic_mapping[item_type],key="0x0000",value=data, callback=self.acked)



def convert_items(self, items):
for item in items:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def read(fname):
'streaming': [
'timeout-decorator==0.4.1',
'google-cloud-pubsub==0.39.1',
'kafka-python==2.0.2'
'confluent-kafka==2.2.0'
],
'dev': [
'pytest~=4.3.0'
Expand Down

0 comments on commit 8c37eca

Please sign in to comment.