diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d56bcc43b..147202d211 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add optional distro and configurator selection for auto-instrumentation ([#1823](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1823)) +### Added +- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method + ([#1786](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1786)) + ## Version 1.18.0/0.39b0 (2023-05-10) - Update runtime metrics to follow semantic conventions diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 12cb363219..c4e68b33b4 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -112,6 +112,8 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from .package import _instruments from .utils import ( KafkaPropertiesExtractor, + _end_current_consume_span, + _create_new_consume_span, _enrich_span, _get_span_name, _kafka_getter, @@ -137,6 +139,12 @@ def __init__(self, config): def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) + # This method is deliberately implemented in order to allow wrapt to wrap this function + def consume( + self, *args, **kwargs + ): # pylint: disable=useless-super-delegation + return super().consume(*args, **kwargs) + class ProxiedProducer(Producer): def __init__(self, producer: Producer, tracer: Tracer): @@ -177,10 +185,14 @@ def committed(self, partitions, timeout=-1): def commit(self, *args, **kwargs): return self._consumer.commit(*args, **kwargs) - def consume( - self, num_messages=1, *args, **kwargs - ): # pylint: disable=keyword-arg-before-vararg - return self._consumer.consume(num_messages, *args, **kwargs) + def consume(self, *args, **kwargs): + return ConfluentKafkaInstrumentor.wrap_consume( + self._consumer.consume, + self, + self._tracer, + args, + kwargs, + ) def get_watermark_offsets( self, partition, timeout=-1, *args, **kwargs @@ -275,6 +287,11 @@ def _inner_wrap_poll(func, instance, args, kwargs): func, instance, self._tracer, args, kwargs ) + def _inner_wrap_consume(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_consume( + func, instance, self._tracer, args, kwargs + ) + wrapt.wrap_function_wrapper( AutoInstrumentedProducer, "produce", @@ -287,6 +304,12 @@ def _inner_wrap_poll(func, instance, args, kwargs): _inner_wrap_poll, ) + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "consume", + _inner_wrap_consume, + ) + def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer confluent_kafka.Consumer = self._original_kafka_consumer @@ -326,29 +349,14 @@ def wrap_produce(func, instance, tracer, args, kwargs): @staticmethod def wrap_poll(func, instance, tracer, args, kwargs): if instance._current_consume_span: - context.detach(instance._current_context_token) - instance._current_context_token = None - instance._current_consume_span.end() - instance._current_consume_span = None + _end_current_consume_span(instance) with tracer.start_as_current_span( "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER ): record = func(*args, **kwargs) if record: - links = [] - ctx = propagate.extract(record.headers(), getter=_kafka_getter) - if ctx: - for item in ctx.values(): - if hasattr(item, "get_span_context"): - links.append(Link(context=item.get_span_context())) - - instance._current_consume_span = tracer.start_span( - name=f"{record.topic()} process", - links=links, - kind=SpanKind.CONSUMER, - ) - + _create_new_consume_span(instance, tracer, [record]) _enrich_span( instance._current_consume_span, record.topic(), @@ -361,3 +369,26 @@ def wrap_poll(func, instance, tracer, args, kwargs): ) return record + + @staticmethod + def wrap_consume(func, instance, tracer, args, kwargs): + if instance._current_consume_span: + _end_current_consume_span(instance) + + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): + records = func(*args, **kwargs) + if len(records) > 0: + _create_new_consume_span(instance, tracer, records) + _enrich_span( + instance._current_consume_span, + records[0].topic(), + operation=MessagingOperationValues.PROCESS, + ) + + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) + + return records diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 77fce03cd8..2029960703 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -1,6 +1,8 @@ from logging import getLogger from typing import List, Optional +from opentelemetry import context, propagate +from opentelemetry.trace import SpanKind, Link from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, @@ -81,6 +83,34 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: _kafka_getter = KafkaContextGetter() +def _end_current_consume_span(instance): + context.detach(instance._current_context_token) + instance._current_context_token = None + instance._current_consume_span.end() + instance._current_consume_span = None + + +def _create_new_consume_span(instance, tracer, records): + links = _get_links_from_records(records) + instance._current_consume_span = tracer.start_span( + name=f"{records[0].topic()} process", + links=links, + kind=SpanKind.CONSUMER, + ) + + +def _get_links_from_records(records): + links = [] + for record in records: + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + if ctx: + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + + return links + + def _enrich_span( span, topic, @@ -94,7 +124,7 @@ def _enrich_span( span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) - if partition: + if partition is not None: span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) span.set_attribute( @@ -109,7 +139,7 @@ def _enrich_span( # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic # A message within Kafka is uniquely defined by its topic name, topic partition and offset. - if partition and offset and topic: + if partition is not None and offset is not None and topic: span.set_attribute( SpanAttributes.MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}", diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 1e3f304188..d7ac343dbf 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -14,7 +14,12 @@ # pylint: disable=no-name-in-module -from unittest import TestCase +from opentelemetry.semconv.trace import ( + SpanAttributes, + MessagingDestinationKindValues, +) +from opentelemetry.test.test_base import TestBase +from .utils import MockConsumer, MockedMessage from confluent_kafka import Consumer, Producer @@ -29,7 +34,7 @@ ) -class TestConfluentKafka(TestCase): +class TestConfluentKafka(TestBase): def test_instrument_api(self) -> None: instrumentation = ConfluentKafkaInstrumentor() @@ -104,3 +109,140 @@ def test_context_getter(self) -> None: context_setter.set(carrier_list, "key1", "val1") self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"]) self.assertEqual(["key1"], context_getter.keys(carrier_list)) + + def test_poll(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-10", 0, 0, []), + MockedMessage("topic-20", 2, 4, []), + MockedMessage("topic-30", 1, 3, []), + ] + expected_spans = [ + {"name": "recv", "attributes": {}}, + { + "name": "topic-10 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-10", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", + }, + }, + {"name": "recv", "attributes": {}}, + { + "name": "topic-20 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-20", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", + }, + }, + {"name": "recv", "attributes": {}}, + { + "name": "topic-30 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-30", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", + }, + }, + {"name": "recv", "attributes": {}}, + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() + consumer.poll() + consumer.poll() + consumer.poll() + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + def test_consume(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-1", 0, 0, []), + MockedMessage("topic-1", 2, 1, []), + MockedMessage("topic-1", 3, 2, []), + MockedMessage("topic-2", 0, 0, []), + MockedMessage("topic-3", 0, 3, []), + MockedMessage("topic-2", 0, 1, []), + ] + expected_spans = [ + {"name": "recv", "attributes": {}}, + { + "name": "topic-1 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-1", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + }, + }, + {"name": "recv", "attributes": {}}, + { + "name": "topic-2 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-2", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + }, + }, + {"name": "recv", "attributes": {}}, + { + "name": "topic-3 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-3", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + }, + }, + {"name": "recv", "attributes": {}}, + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.consume(3) + consumer.consume(1) + consumer.consume(2) + consumer.consume(1) + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + def _compare_spans(self, spans, expected_spans): + for span, expected_span in zip(spans, expected_spans): + self.assertEqual(expected_span["name"], span.name) + for attribute_key, expected_attribute_value in expected_span[ + "attributes" + ].items(): + self.assertEqual( + expected_attribute_value, span.attributes[attribute_key] + ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py new file mode 100644 index 0000000000..798daaeff4 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -0,0 +1,39 @@ +from confluent_kafka import Consumer + + +class MockConsumer(Consumer): + def __init__(self, queue, config): + self._queue = queue + super().__init__(config) + + def consume( + self, num_messages=1, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + messages = self._queue[:num_messages] + self._queue = self._queue[num_messages:] + return messages + + def poll(self, timeout=None): + if len(self._queue) > 0: + return self._queue.pop(0) + return None + + +class MockedMessage: + def __init__(self, topic: str, partition: int, offset: int, headers): + self._topic = topic + self._partition = partition + self._offset = offset + self._headers = headers + + def topic(self): + return self._topic + + def partition(self): + return self._partition + + def offset(self): + return self._offset + + def headers(self): + return self._headers