Skip to content

Commit

Permalink
feat(confluent-kafka): Add instrumentation to consume method (#1786)
Browse files Browse the repository at this point in the history
Co-authored-by: Diego Hurtado <[email protected]>
  • Loading branch information
javferrod and ocelotl authored Aug 30, 2023
1 parent 0871dd4 commit d854c52
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 25 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add optional distro and configurator selection for auto-instrumentation
([#1823](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1823))

### Added
- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method
([#1786](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1786))

## Version 1.18.0/0.39b0 (2023-05-10)

- Update runtime metrics to follow semantic conventions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
from .package import _instruments
from .utils import (
KafkaPropertiesExtractor,
_end_current_consume_span,
_create_new_consume_span,
_enrich_span,
_get_span_name,
_kafka_getter,
Expand All @@ -137,6 +139,12 @@ def __init__(self, config):
def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
return super().poll(timeout)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def consume(
self, *args, **kwargs
): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)


class ProxiedProducer(Producer):
def __init__(self, producer: Producer, tracer: Tracer):
Expand Down Expand Up @@ -177,10 +185,14 @@ def committed(self, partitions, timeout=-1):
def commit(self, *args, **kwargs):
return self._consumer.commit(*args, **kwargs)

def consume(
self, num_messages=1, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
return self._consumer.consume(num_messages, *args, **kwargs)
def consume(self, *args, **kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
self._consumer.consume,
self,
self._tracer,
args,
kwargs,
)

def get_watermark_offsets(
self, partition, timeout=-1, *args, **kwargs
Expand Down Expand Up @@ -275,6 +287,11 @@ def _inner_wrap_poll(func, instance, args, kwargs):
func, instance, self._tracer, args, kwargs
)

def _inner_wrap_consume(func, instance, args, kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
func, instance, self._tracer, args, kwargs
)

wrapt.wrap_function_wrapper(
AutoInstrumentedProducer,
"produce",
Expand All @@ -287,6 +304,12 @@ def _inner_wrap_poll(func, instance, args, kwargs):
_inner_wrap_poll,
)

wrapt.wrap_function_wrapper(
AutoInstrumentedConsumer,
"consume",
_inner_wrap_consume,
)

def _uninstrument(self, **kwargs):
confluent_kafka.Producer = self._original_kafka_producer
confluent_kafka.Consumer = self._original_kafka_consumer
Expand Down Expand Up @@ -326,29 +349,14 @@ def wrap_produce(func, instance, tracer, args, kwargs):
@staticmethod
def wrap_poll(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None
_end_current_consume_span(instance)

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
record = func(*args, **kwargs)
if record:
links = []
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))

instance._current_consume_span = tracer.start_span(
name=f"{record.topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)

_create_new_consume_span(instance, tracer, [record])
_enrich_span(
instance._current_consume_span,
record.topic(),
Expand All @@ -361,3 +369,26 @@ def wrap_poll(func, instance, tracer, args, kwargs):
)

return record

@staticmethod
def wrap_consume(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
_end_current_consume_span(instance)

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
records = func(*args, **kwargs)
if len(records) > 0:
_create_new_consume_span(instance, tracer, records)
_enrich_span(
instance._current_consume_span,
records[0].topic(),
operation=MessagingOperationValues.PROCESS,
)

instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)

return records
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from logging import getLogger
from typing import List, Optional

from opentelemetry import context, propagate
from opentelemetry.trace import SpanKind, Link
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
Expand Down Expand Up @@ -81,6 +83,34 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
_kafka_getter = KafkaContextGetter()


def _end_current_consume_span(instance):
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None


def _create_new_consume_span(instance, tracer, records):
links = _get_links_from_records(records)
instance._current_consume_span = tracer.start_span(
name=f"{records[0].topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)


def _get_links_from_records(records):
links = []
for record in records:
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))

return links


def _enrich_span(
span,
topic,
Expand All @@ -94,7 +124,7 @@ def _enrich_span(
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)

if partition:
if partition is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)

span.set_attribute(
Expand All @@ -109,7 +139,7 @@ def _enrich_span(

# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
if partition and offset and topic:
if partition is not None and offset is not None and topic:
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
f"{topic}.{partition}.{offset}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

# pylint: disable=no-name-in-module

from unittest import TestCase
from opentelemetry.semconv.trace import (
SpanAttributes,
MessagingDestinationKindValues,
)
from opentelemetry.test.test_base import TestBase
from .utils import MockConsumer, MockedMessage

from confluent_kafka import Consumer, Producer

Expand All @@ -29,7 +34,7 @@
)


class TestConfluentKafka(TestCase):
class TestConfluentKafka(TestBase):
def test_instrument_api(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()

Expand Down Expand Up @@ -104,3 +109,140 @@ def test_context_getter(self) -> None:
context_setter.set(carrier_list, "key1", "val1")
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
self.assertEqual(["key1"], context_getter.keys(carrier_list))

def test_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-10", 0, 0, []),
MockedMessage("topic-20", 2, 4, []),
MockedMessage("topic-30", 1, 3, []),
]
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-10 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-20 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 2,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-30 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 1,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
consumer.poll()
consumer.poll()
consumer.poll()

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_consume(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-1", 0, 0, []),
MockedMessage("topic-1", 2, 1, []),
MockedMessage("topic-1", 3, 2, []),
MockedMessage("topic-2", 0, 0, []),
MockedMessage("topic-3", 0, 3, []),
MockedMessage("topic-2", 0, 1, []),
]
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-1 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-2 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-3 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)

self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.consume(3)
consumer.consume(1)
consumer.consume(2)
consumer.consume(1)
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def _compare_spans(self, spans, expected_spans):
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
for attribute_key, expected_attribute_value in expected_span[
"attributes"
].items():
self.assertEqual(
expected_attribute_value, span.attributes[attribute_key]
)
Loading

0 comments on commit d854c52

Please sign in to comment.