Skip to content

Commit

Permalink
chore(confluent-kafka): Add test for poll and consume
Browse files Browse the repository at this point in the history
  • Loading branch information
javferrod committed May 20, 2023
1 parent f042dea commit 7e25294
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

# pylint: disable=no-name-in-module

from unittest import TestCase
from unittest.mock import patch

from opentelemetry.semconv.trace import SpanAttributes, MessagingDestinationKindValues
from opentelemetry.test.test_base import TestBase
from .utils import MockConsumer, MockedMessage

from confluent_kafka import Consumer, Producer

Expand All @@ -29,7 +33,7 @@
)


class TestConfluentKafka(TestCase):
class TestConfluentKafka(TestBase):
def test_instrument_api(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()

Expand Down Expand Up @@ -104,3 +108,160 @@ def test_context_getter(self) -> None:
context_setter.set(carrier_list, "key1", "val1")
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
self.assertEqual(["key1"], context_getter.keys(carrier_list))

def test_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-10", 0, 0, []),
MockedMessage("topic-20", 2, 4, []),
MockedMessage("topic-30", 1, 3, []),
]
expected_spans= [
{
"name": "recv",
"attributes": {}
},
{
"name": "topic-10 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
}
},
{
"name": "recv",
"attributes": {}
},
{
"name": "topic-20 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 2,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
}
},
{
"name": "recv",
"attributes": {}
},
{
"name": "topic-30 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 1,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
}
},
{
"name": "recv",
"attributes": {}
},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
}
)
span_list = self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll(1)
consumer.poll(1)
consumer.poll(1)
consumer.poll(1)

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_consume(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-1", 0, 0, []),
MockedMessage("topic-1", 2, 1, []),
MockedMessage("topic-1", 3, 2, []),
MockedMessage("topic-2", 0, 0, []),
MockedMessage("topic-3", 0, 3, []),
MockedMessage("topic-2", 0, 1, []),
]
expected_spans= [
{
"name": "recv",
"attributes": {}
},
{
"name": "topic-1 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
}
},
{
"name": "recv",
"attributes": {}
},
{
"name": "topic-2 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
}
},
{
"name": "recv",
"attributes": {}
},
{
"name": "topic-3 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
}
},
{
"name": "recv",
"attributes": {}
},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
}
)

span_list = self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.consume(3)
consumer.consume(1)
consumer.consume(2)
consumer.consume(1)
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def _compare_spans(self, spans, expected_spans):
for (span, expected_span) in zip(spans, expected_spans):
self.assertEqual(expected_span['name'], span.name)
for attribute_key, expected_attribute_value in expected_span['attributes'].items():
self.assertEqual(expected_attribute_value, span.attributes[attribute_key])
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from confluent_kafka import Consumer


class MockConsumer(Consumer):

def __init__(self, queue, config):
self._queue = queue
super().__init__(config)

def consume(self, num_messages=1, *args, **kwargs):
messages = self._queue[:num_messages]
self._queue = self._queue[num_messages:]
return messages

def poll(self, timeout=None):
if len(self._queue) > 0:
return self._queue.pop(0)
else:
return None


class MockedMessage:
def __init__(self, topic: str, partition: int, offset: int, headers):
self._topic = topic
self._partition = partition
self._offset = offset
self._headers = headers

def topic(self):
return self._topic

def partition(self):
return self._partition

def offset(self):
return self._offset

def headers(self):
return self._headers

0 comments on commit 7e25294

Please sign in to comment.