Skip to content

Commit

Permalink
Fix to allow topic to be imported from kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
bourbonkk committed Oct 11, 2024
1 parent 6a54106 commit 47a70f4
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def wrap_produce(func, instance, tracer, args, kwargs):
headers = []
kwargs["headers"] = headers

topic = KafkaPropertiesExtractor.extract_produce_topic(args)
topic = KafkaPropertiesExtractor.extract_produce_topic(args, kwargs)
_enrich_span(
span,
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ def _extract_argument(key, position, default_value, args, kwargs):
return kwargs.get(key, default_value)

@staticmethod
def extract_produce_topic(args):
def extract_produce_topic(args, kwargs):
"""extract topic from `produce` method arguments in Producer class"""
if len(args) > 0:
return args[0]
return "unknown"
return kwargs.get("topic") or (args[0] if args else "unknown")

@staticmethod
def extract_produce_headers(args, kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ def _compare_spans(self, spans, expected_spans):
expected_attribute_value, span.attributes[attribute_key]
)

def _assert_topic(self, expected_topic: str) -> None:
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.attributes[SpanAttributes.MESSAGING_DESTINATION], expected_topic)


def test_producer_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []
Expand All @@ -299,6 +306,8 @@ def test_producer_poll(self) -> None:
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.poll()
self.assertIsNotNone(msg)
self._assert_topic("topic-1")


def test_producer_flush(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
Expand All @@ -315,3 +324,4 @@ def test_producer_flush(self) -> None:
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.flush()
self.assertIsNotNone(msg)
self._assert_topic("topic-1")

0 comments on commit 47a70f4

Please sign in to comment.