diff --git a/CHANGELOG.md b/CHANGELOG.md index 068a067dd1..9496cde02b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553)) - `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available ([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212)) +- `opentelemetry-instrumentation-aio-pika` Support `aio_pika` 8.x + ([#1481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1481)) ### Fixed diff --git a/instrumentation/README.md b/instrumentation/README.md index a269b09397..b1482a0227 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -1,7 +1,7 @@ | Instrumentation | Supported Packages | Metrics support | | --------------- | ------------------ | --------------- | -| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No +| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 9.0.0 | No | [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No | [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No | [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml b/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml index 4511254a70..994642e22a 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ [project.optional-dependencies] instruments = [ - "aio_pika ~= 7.2.0", + "aio_pika >= 7.2.0, < 9.0.0", ] test = [ "opentelemetry-instrumentation-aio-pika[instruments]", diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py index 6c7ed74ea4..285e9f99cb 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py @@ -13,4 +13,4 @@ # limitations under the License. from typing import Collection -_instruments: Collection[str] = ("aio_pika ~= 7.2.0",) +_instruments: Collection[str] = ("aio_pika >= 7.2.0, < 9.0.0",) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index a61209e0ce..8522e2cb7c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -49,7 +49,13 @@ def set_destination(self, destination: str): self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination def set_channel(self, channel: AbstractChannel): - url = channel.connection.connection.url + connection = channel.connection + if getattr(connection, "connection", None): + # aio_rmq 7 + url = connection.connection.url + else: + # aio_rmq 8 + url = connection.url self._attributes.update({ SpanAttributes.NET_PEER_NAME: url.host, SpanAttributes.NET_PEER_PORT: url.port diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py index ada7080192..af10a5bab6 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -15,8 +15,10 @@ SERVER_URL = URL( f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/" ) -CONNECTION = Namespace(connection=Namespace(url=SERVER_URL)) -CHANNEL = Namespace(connection=CONNECTION, loop=None) +CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL)) +CONNECTION_8 = Namespace(connection=Namespace(url=SERVER_URL)) +CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None) +CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None) MESSAGE = Namespace( properties=Namespace( message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={} diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py index 70883c116c..19aa07f953 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py @@ -23,7 +23,8 @@ from opentelemetry.trace import SpanKind, get_tracer from .consts import ( - CHANNEL, + CHANNEL_7, + CHANNEL_8, CORRELATION_ID, EXCHANGE_NAME, MESSAGE, @@ -52,7 +53,7 @@ def setUp(self): asyncio.set_event_loop(self.loop) def test_get_callback_span(self): - queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) + queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None) tracer = mock.MagicMock() CallbackDecorator(tracer, queue)._get_span(MESSAGE) tracer.start_span.assert_called_once_with( @@ -62,7 +63,45 @@ def test_get_callback_span(self): ) def test_decorate_callback(self): - queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) + queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None) + callback = mock.MagicMock(return_value=asyncio.sleep(0)) + with mock.patch.object( + CallbackDecorator, "_get_span" + ) as mocked_get_callback_span: + callback_decorator = CallbackDecorator(self.tracer, queue) + decorated_callback = callback_decorator.decorate(callback) + self.loop.run_until_complete(decorated_callback(MESSAGE)) + mocked_get_callback_span.assert_called_once() + callback.assert_called_once_with(MESSAGE) + +class TestInstrumentedQueue(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME, + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_OPERATION: "receive", + } + + def setUp(self): + self.tracer = get_tracer(__name__) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_callback_span(self): + queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None) + tracer = mock.MagicMock() + CallbackDecorator(tracer, queue)._get_span(MESSAGE) + tracer.start_span.assert_called_once_with( + f"{EXCHANGE_NAME} receive", + kind=SpanKind.CONSUMER, + attributes=self.EXPECTED_ATTRIBUTES, + ) + + def test_decorate_callback(self): + queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None) callback = mock.MagicMock(return_value=asyncio.sleep(0)) with mock.patch.object( CallbackDecorator, "_get_span" diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index 80dfa3182b..11576bfcc5 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -24,8 +24,10 @@ from opentelemetry.trace import SpanKind, get_tracer from .consts import ( - CHANNEL, - CONNECTION, + CHANNEL_7, + CHANNEL_8, + CONNECTION_7, + CONNECTION_8, CORRELATION_ID, EXCHANGE_NAME, MESSAGE, @@ -37,7 +39,7 @@ ) -class TestInstrumentedExchange(TestCase): +class TestInstrumentedExchangeAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", @@ -54,7 +56,7 @@ def setUp(self): asyncio.set_event_loop(self.loop) def test_get_publish_span(self): - exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME) + exchange = Exchange(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) tracer = mock.MagicMock() PublishDecorator(tracer, exchange)._get_publish_span( MESSAGE, ROUTING_KEY @@ -66,7 +68,59 @@ def test_get_publish_span(self): ) def _test_publish(self, exchange_type: Type[Exchange]): - exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) + exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + + def test_publish(self): + self._test_publish(Exchange) + + def test_robust_publish(self): + self._test_publish(RobustExchange) + + +class TestInstrumentedExchangeAioRmq8(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_TEMP_DESTINATION: True, + } + + def setUp(self): + self.tracer = get_tracer(__name__) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_publish_span(self): + exchange = Exchange(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME) + tracer = mock.MagicMock() + PublishDecorator(tracer, exchange)._get_publish_span( + MESSAGE, ROUTING_KEY + ) + tracer.start_span.assert_called_once_with( + f"{EXCHANGE_NAME},{ROUTING_KEY} send", + kind=SpanKind.PRODUCER, + attributes=self.EXPECTED_ATTRIBUTES, + ) + + def _test_publish(self, exchange_type: Type[Exchange]): + exchange = exchange_type(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME) with mock.patch.object( PublishDecorator, "_get_publish_span" ) as mock_get_publish_span: diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 36fda70ab1..20c5a0b725 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -17,7 +17,7 @@ libraries = { "aio_pika": { - "library": "aio_pika ~= 7.2.0", + "library": "aio_pika >= 7.2.0, < 9.0.0", "instrumentation": "opentelemetry-instrumentation-aio-pika==0.37b0.dev", }, "aiohttp": { diff --git a/tox.ini b/tox.ini index 4cd6fa02f5..63e5459c93 100644 --- a/tox.ini +++ b/tox.ini @@ -203,6 +203,10 @@ envlist = py3{7,8,9,10,11}-test-instrumentation-pika{0,1} pypy3-test-instrumentation-pika{0,1} + ; opentelemetry-instrumentation-aio-pika + py3{7,8,9,10,11}-test-instrumentation-aio-pika{7,8} + pypy3-test-instrumentation-aio-pika{7,8} + ; opentelemetry-instrumentation-kafka-python py3{7,8,9,10,11}-test-instrumentation-kafka-python pypy3-test-instrumentation-kafka-python @@ -247,6 +251,8 @@ deps = sqlalchemy14: sqlalchemy~=1.4 pika0: pika>=0.12.0,<1.0.0 pika1: pika>=1.0.0 + aio-pika7: aio_pika~=7.2.0 + aio-pika8: aio_pika>=8.0.0,<9.0.0 pymemcache135: pymemcache ==1.3.5 pymemcache200: pymemcache >2.0.0,<3.0.0 pymemcache300: pymemcache >3.0.0,<3.4.2 @@ -292,6 +298,7 @@ changedir = test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests + test-instrumentation-aio-pika{7,8}: instrumentation/opentelemetry-instrumentation-aio-pika/tests test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests test-instrumentation-pymemcache{135,200,300,342}: instrumentation/opentelemetry-instrumentation-pymemcache/tests test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests @@ -333,6 +340,8 @@ commands_pre = pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + aio-pika{7,8}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika[test] + kafka-python: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python[test] confluent-kafka: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-confluent-kafka[test]