Skip to content

Commit

Permalink
feat(confluent-kafka): Add instrumentation to consume method
Browse files Browse the repository at this point in the history
  • Loading branch information
javferrod committed May 6, 2023
1 parent 890e5dd commit f6e5f98
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 15 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407))
- `opentelemetry-instrumentation-logging` Add `otelTraceSampled` to instrumetation-logging
([#1773](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1773))

- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
from .utils import (
KafkaPropertiesExtractor,
_enrich_span,
_get_links_from_records,
_get_span_name,
_kafka_getter,
_kafka_setter,
Expand All @@ -136,6 +137,10 @@ def __init__(self, config):
# This method is deliberately implemented in order to allow wrapt to wrap this function
def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
return super().poll(timeout)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)


class ProxiedProducer(Producer):
Expand Down Expand Up @@ -178,9 +183,11 @@ def commit(self, *args, **kwargs):
return self._consumer.commit(*args, **kwargs)

def consume(
self, num_messages=1, *args, **kwargs
self, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
return self._consumer.consume(num_messages, *args, **kwargs)
return ConfluentKafkaInstrumentor.wrap_consume(
self._consumer.consume, self, self._tracer, args, kwargs,
)

def get_watermark_offsets(
self, partition, timeout=-1, *args, **kwargs
Expand Down Expand Up @@ -274,6 +281,11 @@ def _inner_wrap_poll(func, instance, args, kwargs):
return ConfluentKafkaInstrumentor.wrap_poll(
func, instance, self._tracer, args, kwargs
)

def _inner_wrap_consume(func, instance, args, kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
func, instance, self._tracer, args, kwargs
)

wrapt.wrap_function_wrapper(
AutoInstrumentedProducer,
Expand All @@ -286,6 +298,12 @@ def _inner_wrap_poll(func, instance, args, kwargs):
"poll",
_inner_wrap_poll,
)

wrapt.wrap_function_wrapper(
AutoInstrumentedConsumer,
"consume",
_inner_wrap_consume,
)

def _uninstrument(self, **kwargs):
confluent_kafka.Producer = self._original_kafka_producer
Expand Down Expand Up @@ -336,13 +354,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
):
record = func(*args, **kwargs)
if record:
links = []
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))

links = _get_links_from_records([record])
instance._current_consume_span = tracer.start_span(
name=f"{record.topic()} process",
links=links,
Expand All @@ -361,3 +373,35 @@ def wrap_poll(func, instance, tracer, args, kwargs):
)

return record

@staticmethod
def wrap_consume(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
records = func(*args, **kwargs)
if len(records) > 0:
links = _get_links_from_records(records)
instance._current_consume_span = tracer.start_span(
name=f"{records[0].topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)

_enrich_span(
instance._current_consume_span,
records[0].topic(),
operation=MessagingOperationValues.PROCESS,
)

instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)

return records
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from logging import getLogger
from typing import List, Optional

from opentelemetry import propagate
from opentelemetry.trace import SpanKind, Link
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
Expand Down Expand Up @@ -82,11 +84,11 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:


def _enrich_span(
span,
topic,
partition: Optional[int] = None,
offset: Optional[int] = None,
operation: Optional[MessagingOperationValues] = None,
span,
topic,
partition: Optional[int] = None,
offset: Optional[int] = None,
operation: Optional[MessagingOperationValues] = None,
):
if not span.is_recording():
return
Expand Down Expand Up @@ -116,6 +118,18 @@ def _enrich_span(
)


def _get_links_from_records(records):
links = []
for record in records:
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))

return links


_kafka_setter = KafkaContextSetter()


Expand Down

0 comments on commit f6e5f98

Please sign in to comment.