Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-flask`: Clean up environ keys in `_teardown_request` to prevent duplicate execution
([#4341](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4341))

### Fixed

- `opentelemetry-instrumentation-confluent-kafka`: Populate `server.address` and `server.port` span attributes from the producer/consumer `bootstrap.servers` config; previously `KafkaPropertiesExtractor.extract_bootstrap_servers` was defined but never called
([#4423](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4423))

### Breaking changes

- Drop Python 3.9 support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,27 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
from .version import __version__


def _capture_config(args, kwargs):
"""Return the config dict that was passed to a Producer/Consumer
constructor, regardless of whether it was supplied positionally, as
``conf=`` kwarg, or (for Consumer) expanded as **kwargs."""
if args and isinstance(args[0], dict):
return args[0]
conf = kwargs.get("conf")
if isinstance(conf, dict):
return conf
# confluent_kafka.Consumer also supports Consumer(**conf) — in that case
# the kwargs themselves are the config.
if kwargs:
return dict(kwargs)
return None


class AutoInstrumentedProducer(Producer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = _capture_config(args, kwargs)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation
super().produce(topic, value, *args, **kwargs)
Expand All @@ -136,6 +156,7 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
class AutoInstrumentedConsumer(Consumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = _capture_config(args, kwargs)
self._current_consume_span = None

# This method is deliberately implemented in order to allow wrapt to wrap this function
Expand All @@ -155,6 +176,10 @@ class ProxiedProducer(Producer):
def __init__(self, producer: Producer, tracer: Tracer):
self._producer = producer
self._tracer = tracer
# Surface the wrapped producer's config (if any) so that
# KafkaPropertiesExtractor.extract_bootstrap_servers can read it
# through this proxy.
self.config = getattr(producer, "config", None)

def flush(self, timeout=-1):
return self._producer.flush(timeout)
Expand Down Expand Up @@ -184,6 +209,8 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
self._tracer = tracer
self._current_consume_span = None
self._current_context_token = None
# See ProxiedProducer.__init__ for rationale.
self.config = getattr(consumer, "config", None)

def close(self, *args, **kwargs):
return ConfluentKafkaInstrumentor.wrap_close(
Expand Down Expand Up @@ -367,11 +394,15 @@ def wrap_produce(func, instance, tracer, args, kwargs):
topic = KafkaPropertiesExtractor.extract_produce_topic(
args, kwargs
)
bootstrap_servers = (
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
)
_enrich_span(
span,
topic,
operation=MessagingOperationTypeValues.RECEIVE,
) # Replace
operation=MessagingOperationTypeValues.PUBLISH,
bootstrap_servers=bootstrap_servers,
) # Publish
propagate.inject(
headers,
setter=_kafka_setter,
Expand All @@ -385,6 +416,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):

record = func(*args, **kwargs)
if record:
bootstrap_servers = (
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
)
with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
Expand All @@ -395,6 +429,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
record.partition(),
record.offset(),
operation=MessagingOperationTypeValues.PROCESS,
bootstrap_servers=bootstrap_servers,
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
Expand All @@ -409,6 +444,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):

records = func(*args, **kwargs)
if len(records) > 0:
bootstrap_servers = (
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
)
with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
Expand All @@ -417,6 +455,7 @@ def wrap_consume(func, instance, tracer, args, kwargs):
instance._current_consume_span,
records[0].topic(),
operation=MessagingOperationTypeValues.PROCESS,
bootstrap_servers=bootstrap_servers,
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
MESSAGING_SYSTEM,
MessagingOperationTypeValues,
)
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
SpanAttributes,
Expand All @@ -21,7 +25,17 @@
class KafkaPropertiesExtractor:
@staticmethod
def extract_bootstrap_servers(instance):
return instance.config.get("bootstrap_servers")
config = getattr(instance, "config", None)
if not isinstance(config, dict):
return None
# confluent-kafka uses the dotted key "bootstrap.servers"; also accept
# the python-style "bootstrap_servers" for robustness.
servers = config.get("bootstrap.servers") or config.get(
"bootstrap_servers"
)
if isinstance(servers, (list, tuple)):
servers = ",".join(str(s) for s in servers)
return servers

@staticmethod
def _extract_argument(key, position, default_value, args, kwargs):
Expand Down Expand Up @@ -115,12 +129,35 @@ def _get_links_from_records(records):
return links


def _set_bootstrap_servers_attributes(span, bootstrap_servers):
"""Populate server.address and server.port from a bootstrap.servers
string (e.g. ``host1:9092,host2:9092``)."""
if not bootstrap_servers:
return

first_broker = bootstrap_servers.split(",")[0].strip()
if not first_broker:
return

if ":" in first_broker:
host, _, port = first_broker.rpartition(":")
span.set_attribute(SERVER_ADDRESS, host)
try:
span.set_attribute(SERVER_PORT, int(port))
except ValueError:
# Port wasn't numeric; skip rather than emit a bad attribute.
_LOG.debug("non-numeric port in bootstrap.servers: %r", port)
else:
span.set_attribute(SERVER_ADDRESS, first_broker)


def _enrich_span(
span,
topic,
partition: Optional[int] = None,
offset: Optional[int] = None,
operation: Optional[MessagingOperationTypeValues] = None,
bootstrap_servers: Optional[str] = None,
):
if not span.is_recording():
return
Expand All @@ -141,6 +178,8 @@ def _enrich_span(
else:
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)

_set_bootstrap_servers_attributes(span, bootstrap_servers)

# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
if partition is not None and offset is not None and topic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
MESSAGING_OPERATION,
MESSAGING_SYSTEM,
)
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
SpanAttributes,
Expand Down Expand Up @@ -447,3 +451,45 @@ def test_producer_flush(self) -> None:
span_list = self.memory_exporter.get_finished_spans()
self._assert_span_count(span_list, 1)
self._assert_topic(span_list[0], "topic-1")

def test_producer_sets_bootstrap_servers_attributes(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
producer = MockedProducer(
[],
{
"bootstrap.servers": "broker-a:9092,broker-b:9093",
},
)

producer = instrumentation.instrument_producer(producer)
producer.produce(topic="topic-1", key="k", value="v")

span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(span.attributes[SERVER_ADDRESS], "broker-a")
self.assertEqual(span.attributes[SERVER_PORT], 9092)

def test_consumer_sets_bootstrap_servers_attributes(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
consumer = MockConsumer(
[MockedMessage("topic-1", 0, 0, [])],
{
"bootstrap.servers": "broker-1:9092",
"group.id": "g",
"auto.offset.reset": "earliest",
},
)

self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
# Second (empty) poll ends the in-flight `<topic> process` span so it
# shows up in the exporter.
consumer.poll()

process_span = next(
s
for s in self.memory_exporter.get_finished_spans()
if s.name == "topic-1 process"
)
self.assertEqual(process_span.attributes[SERVER_ADDRESS], "broker-1")
self.assertEqual(process_span.attributes[SERVER_PORT], 9092)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class MockConsumer(Consumer):
def __init__(self, queue, config):
self._queue = queue
self.config = config
super().__init__(config)

def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
Expand Down Expand Up @@ -58,6 +59,7 @@ def value(self):
class MockedProducer(Producer):
def __init__(self, queue, config):
self._queue = queue
self.config = config
super().__init__(config)

def produce(self, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
Expand Down