diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 3f9bd6f39c..f3250ff487 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -24,6 +24,14 @@ def _extract_argument(key, position, default_value, args, kwargs): return args[position] return kwargs.get(key, default_value) + @staticmethod + def has_headers_in_args(args): + return len(args) > 3 + + @staticmethod + def replace_headers_in_args(args, new_headers) -> tuple: + return *args[:3], new_headers, *args[4:] + @staticmethod def extract_send_topic(args, kwargs): """extract topic from `send` method arguments in KafkaProducer class""" @@ -143,7 +151,13 @@ def _traced_send(func, instance, args, kwargs): headers = KafkaPropertiesExtractor.extract_send_headers(args, kwargs) if headers is None: headers = [] - kwargs["headers"] = headers + + if KafkaPropertiesExtractor.has_headers_in_args(args): + args = KafkaPropertiesExtractor.replace_headers_in_args( + args, headers + ) + else: + kwargs["headers"] = headers topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers( diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_instrumentation.py index 7bc724fb78..936a4fed1c 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_instrumentation.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. from unittest import TestCase +from unittest.mock import Mock, patch + +import kafka from kafka import KafkaConsumer, KafkaProducer from wrapt import BoundFunctionWrapper @@ -34,3 +37,22 @@ def test_instrument_api(self) -> None: self.assertFalse( isinstance(KafkaConsumer.__next__, BoundFunctionWrapper) ) + + def test_kafka_producer_send_arguments(self) -> None: + instrumentation = KafkaInstrumentor() + + class MockedKafkaProducer(Mock): + def send(self, topic, value=None, key=None, headers=None, + partition=None, timestamp_ms=None): + pass + + with patch.object(kafka, "KafkaProducer", MockedKafkaProducer): + instrumentation.instrument() + producer = kafka.KafkaProducer() + producer.send('test-topic', b'message') + producer.send('test-topic', b'message', None, None, None, None) + producer.send( + 'test-topic', b'message', None, + headers=[("system", "amd64")] + ) + instrumentation.uninstrument()