From 39c8c4b1362c461b159026a615ea7c5bd5662545 Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Thu, 18 Aug 2022 07:16:34 +0200 Subject: [PATCH 1/4] boto3sqs: Fix various issues * do not use 'otel' prefix for propagation keys to make propagation compatible with other SQS instrumentations like Node.Js Inject propergator.fields keys into the MessageAttributeNames argument for 'receive_message' calls to retreive the corresponding message attributes * add 'messaging.url' span attribute to SQS spans * add boto3sqs instrumentation to tox.ini to run tests in CI * add some basic unit tests --- .../setup.cfg | 3 + .../instrumentation/boto3sqs/__init__.py | 146 +++++---- .../tests/test_boto3sqs_instrumentation.py | 309 +++++++++++++----- tox.ini | 8 + 4 files changed, 323 insertions(+), 143 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg index dd7ab17502..b6cd820a06 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg @@ -46,10 +46,13 @@ package_dir= packages=find_namespace: install_requires = opentelemetry-api ~= 1.12 + opentelemetry-semantic-conventions == 0.33b0 + opentelemetry-instrumentation == 0.33b0 wrapt >= 1.0.0, < 2.0.0 [options.extras_require] test = + opentelemetry-test-utils == 0.33b0 [options.packages.find] where = src diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index fa30816350..450541354e 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -29,7 +29,7 @@ Boto3SQSInstrumentor().instrument() """ import logging -from typing import Any, Collection, Dict, Generator, List, Optional +from typing import Any, Collection, Dict, Generator, List, Mapping, Optional import boto3 import botocore.client @@ -53,33 +53,31 @@ from .version import __version__ _logger = logging.getLogger(__name__) -# We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming -# existing filters -_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER: str = "otel." -_OTEL_IDENTIFIER_LENGTH = len(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) + +_IS_SQS_INSTRUMENTED_ATTRIBUTE = "_otel_boto3sqs_instrumented" class Boto3SQSGetter(Getter[CarrierT]): def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: - value = carrier.get(f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", {}) - if not value: + msg_attr = carrier.get(key) + if not isinstance(msg_attr, Mapping): + return None + + value = msg_attr.get("StringValue") + if value is None: return None - return [value.get("StringValue")] + + return [value] def keys(self, carrier: CarrierT) -> List[str]: - return [ - key[_OTEL_IDENTIFIER_LENGTH:] - if key.startswith(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) - else key - for key in carrier.keys() - ] + return list(carrier.keys()) class Boto3SQSSetter(Setter[CarrierT]): def set(self, carrier: CarrierT, key: str, value: str) -> None: # This is a limitation defined by AWS for SQS MessageAttributes size if len(carrier.items()) < 10: - carrier[f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] = { + carrier[key] = { "StringValue": value, "DataType": "String", } @@ -145,6 +143,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _enrich_span( span: Span, queue_name: str, + queue_url: str, conversation_id: Optional[str] = None, operation: Optional[MessagingOperationValues] = None, message_id: Optional[str] = None, @@ -157,12 +156,12 @@ def _enrich_span( SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.QUEUE.value, ) + span.set_attribute(SpanAttributes.MESSAGING_URL, queue_url) + if operation: span.set_attribute( SpanAttributes.MESSAGING_OPERATION, operation.value ) - else: - span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) if conversation_id: span.set_attribute( SpanAttributes.MESSAGING_CONVERSATION_ID, conversation_id @@ -190,15 +189,19 @@ def _extract_queue_name_from_url(queue_url: str) -> str: return queue_url.split("/")[-1] def _create_processing_span( - self, queue_name: str, receipt_handle: str, message: Dict[str, Any] + self, + queue_name: str, + queue_url: str, + receipt_handle: str, + message: Dict[str, Any], ) -> None: message_attributes = message.get("MessageAttributes", {}) links = [] ctx = propagate.extract(message_attributes, getter=boto3sqs_getter) - if ctx: - for item in ctx.values(): - if hasattr(item, "get_span_context"): - links.append(Link(context=item.get_span_context())) + parent_span_ctx = trace.get_current_span(ctx).get_span_context() + if parent_span_ctx.is_valid: + links.append(Link(context=parent_span_ctx)) + span = self._tracer.start_span( name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER ) @@ -208,11 +211,12 @@ def _create_processing_span( Boto3SQSInstrumentor._enrich_span( span, queue_name, + queue_url, message_id=message_id, operation=MessagingOperationValues.PROCESS, ) - def _wrap_send_message(self) -> None: + def _wrap_send_message(self, sqs_class: type) -> None: def send_wrapper(wrapped, instance, args, kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return wrapped(*args, **kwargs) @@ -227,7 +231,7 @@ def send_wrapper(wrapped, instance, args, kwargs): kind=SpanKind.PRODUCER, end_on_exit=True, ) as span: - Boto3SQSInstrumentor._enrich_span(span, queue_name) + Boto3SQSInstrumentor._enrich_span(span, queue_name, queue_url) attributes = kwargs.pop("MessageAttributes", {}) propagate.inject(attributes, setter=boto3sqs_setter) retval = wrapped(*args, MessageAttributes=attributes, **kwargs) @@ -239,9 +243,9 @@ def send_wrapper(wrapped, instance, args, kwargs): ) return retval - wrap_function_wrapper(self._sqs_class, "send_message", send_wrapper) + wrap_function_wrapper(sqs_class, "send_message", send_wrapper) - def _wrap_send_message_batch(self) -> None: + def _wrap_send_message_batch(self, sqs_class: type) -> None: def send_batch_wrapper(wrapped, instance, args, kwargs): queue_url = kwargs.get("QueueUrl") entries = kwargs.get("Entries") @@ -260,12 +264,11 @@ def send_batch_wrapper(wrapped, instance, args, kwargs): for entry in entries: entry_id = entry["Id"] span = self._tracer.start_span( - name=f"{queue_name} send", - kind=SpanKind.PRODUCER, + name=f"{queue_name} send", kind=SpanKind.PRODUCER, ) ids_to_spans[entry_id] = span Boto3SQSInstrumentor._enrich_span( - span, queue_name, conversation_id=entry_id + span, queue_name, queue_url, conversation_id=entry_id ) with trace.use_span(span): if "MessageAttributes" not in entry: @@ -288,15 +291,15 @@ def send_batch_wrapper(wrapped, instance, args, kwargs): return retval wrap_function_wrapper( - self._sqs_class, "send_message_batch", send_batch_wrapper + sqs_class, "send_message_batch", send_batch_wrapper ) - def _wrap_receive_message(self) -> None: + def _wrap_receive_message(self, sqs_class: type) -> None: def receive_message_wrapper(wrapped, instance, args, kwargs): queue_url = kwargs.get("QueueUrl") message_attribute_names = kwargs.pop("MessageAttributeNames", []) - message_attribute_names.append( - f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}*" + message_attribute_names.extend( + propagate.get_global_textmap().fields ) queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url( queue_url @@ -309,6 +312,7 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): Boto3SQSInstrumentor._enrich_span( span, queue_name, + queue_url, operation=MessagingOperationValues.RECEIVE, ) retval = wrapped( @@ -327,7 +331,7 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): receipt_handle ) self._create_processing_span( - queue_name, receipt_handle, message + queue_name, queue_url, receipt_handle, message ) retval["Messages"] = Boto3SQSInstrumentor.ContextableList( messages @@ -335,10 +339,11 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): return retval wrap_function_wrapper( - self._sqs_class, "receive_message", receive_message_wrapper + sqs_class, "receive_message", receive_message_wrapper ) - def _wrap_delete_message(self) -> None: + @staticmethod + def _wrap_delete_message(sqs_class: type) -> None: def delete_message_wrapper(wrapped, instance, args, kwargs): receipt_handle = kwargs.get("ReceiptHandle") if receipt_handle: @@ -346,10 +351,11 @@ def delete_message_wrapper(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) wrap_function_wrapper( - self._sqs_class, "delete_message", delete_message_wrapper + sqs_class, "delete_message", delete_message_wrapper ) - def _wrap_delete_message_batch(self) -> None: + @staticmethod + def _wrap_delete_message_batch(sqs_class: type) -> None: def delete_message_wrapper_batch(wrapped, instance, args, kwargs): entries = kwargs.get("Entries") for entry in entries: @@ -361,9 +367,7 @@ def delete_message_wrapper_batch(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) wrap_function_wrapper( - self._sqs_class, - "delete_message_batch", - delete_message_wrapper_batch, + sqs_class, "delete_message_batch", delete_message_wrapper_batch, ) def _wrap_client_creation(self) -> None: @@ -375,43 +379,43 @@ def _wrap_client_creation(self) -> None: def client_wrapper(wrapped, instance, args, kwargs): retval = wrapped(*args, **kwargs) - if not self._did_decorate: - self._decorate_sqs() + self._decorate_sqs(type(retval)) return retval wrap_function_wrapper(boto3, "client", client_wrapper) - def _decorate_sqs(self) -> None: + def _decorate_sqs(self, sqs_class: type) -> None: """ Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base class and is SQS to wrap. """ # We define SQS client as the only client that implements send_message_batch - sqs_class = [ - cls - for cls in botocore.client.BaseClient.__subclasses__() - if hasattr(cls, "send_message_batch") - ] - if sqs_class: - self._sqs_class = sqs_class[0] - self._did_decorate = True - self._wrap_send_message() - self._wrap_send_message_batch() - self._wrap_receive_message() - self._wrap_delete_message() - self._wrap_delete_message_batch() - - def _un_decorate_sqs(self) -> None: - if self._did_decorate: - unwrap(self._sqs_class, "send_message") - unwrap(self._sqs_class, "send_message_batch") - unwrap(self._sqs_class, "receive_message") - unwrap(self._sqs_class, "delete_message") - unwrap(self._sqs_class, "delete_message_batch") - self._did_decorate = False + if not hasattr(sqs_class, "send_message_batch"): + return + + if getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False): + return + + setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, True) + + self._wrap_send_message(sqs_class) + self._wrap_send_message_batch(sqs_class) + self._wrap_receive_message(sqs_class) + self._wrap_delete_message(sqs_class) + self._wrap_delete_message_batch(sqs_class) + + @staticmethod + def _un_decorate_sqs(sqs_class: type) -> None: + if not getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False): + return + + unwrap(sqs_class, "send_message") + unwrap(sqs_class, "send_message_batch") + unwrap(sqs_class, "receive_message") + unwrap(sqs_class, "delete_message") + unwrap(sqs_class, "delete_message_batch") def _instrument(self, **kwargs: Dict[str, Any]) -> None: - self._did_decorate: bool = False self._tracer_provider: Optional[TracerProvider] = kwargs.get( "tracer_provider" ) @@ -419,8 +423,12 @@ def _instrument(self, **kwargs: Dict[str, Any]) -> None: __name__, __version__, self._tracer_provider ) self._wrap_client_creation() - self._decorate_sqs() + + for client_cls in botocore.client.BaseClient.__subclasses__(): + self._decorate_sqs(client_cls) def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: unwrap(boto3, "client") - self._un_decorate_sqs() + + for client_cls in botocore.client.BaseClient.__subclasses__(): + self._un_decorate_sqs(client_cls) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py index a42545a0d8..468095b53e 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py @@ -14,79 +14,72 @@ # pylint: disable=no-name-in-module -from unittest import TestCase +from contextlib import contextmanager +from typing import Any, Dict +from unittest import TestCase, mock import boto3 -import botocore.client +from botocore.awsrequest import AWSResponse from wrapt import BoundFunctionWrapper, FunctionWrapper from opentelemetry.instrumentation.boto3sqs import ( - _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, Boto3SQSGetter, Boto3SQSInstrumentor, Boto3SQSSetter, ) +from opentelemetry.semconv.trace import ( + MessagingDestinationKindValues, + MessagingOperationValues, + SpanAttributes, +) +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import Span, format_span_id, format_trace_id -# pylint: disable=attribute-defined-outside-init -class TestBoto3SQSInstrumentor(TestCase): - def define_sqs_mock(self) -> None: - # pylint: disable=R0201 - class SQSClientMock(botocore.client.BaseClient): - def send_message(self, *args, **kwargs): - ... - - def send_message_batch(self, *args, **kwargs): - ... - - def receive_message(self, *args, **kwargs): - ... +def _make_sqs_client(): + return boto3.client( + "sqs", + region_name="us-east-1", + aws_access_key_id="dummy", + aws_secret_access_key="dummy", + ) - def delete_message(self, *args, **kwargs): - ... - def delete_message_batch(self, *args, **kwargs): - ... +class TestBoto3SQSInstrumentor(TestCase): + def _assert_instrumented(self, client): + self.assertIsInstance(boto3.client, FunctionWrapper) + self.assertIsInstance(client.send_message, BoundFunctionWrapper) + self.assertIsInstance(client.send_message_batch, BoundFunctionWrapper) + self.assertIsInstance(client.receive_message, BoundFunctionWrapper) + self.assertIsInstance(client.delete_message, BoundFunctionWrapper) + self.assertIsInstance( + client.delete_message_batch, BoundFunctionWrapper + ) - self._boto_sqs_mock = SQSClientMock + @staticmethod + @contextmanager + def _active_instrumentor(): + Boto3SQSInstrumentor().instrument() + try: + yield + finally: + Boto3SQSInstrumentor().uninstrument() def test_instrument_api_before_client_init(self) -> None: - instrumentation = Boto3SQSInstrumentor() - - instrumentation.instrument() - self.assertTrue(isinstance(boto3.client, FunctionWrapper)) - instrumentation.uninstrument() + with self._active_instrumentor(): + client = _make_sqs_client() + self._assert_instrumented(client) def test_instrument_api_after_client_init(self) -> None: - self.define_sqs_mock() - instrumentation = Boto3SQSInstrumentor() + client = _make_sqs_client() + with self._active_instrumentor(): + self._assert_instrumented(client) - instrumentation.instrument() - self.assertTrue(isinstance(boto3.client, FunctionWrapper)) - self.assertTrue( - isinstance(self._boto_sqs_mock.send_message, BoundFunctionWrapper) - ) - self.assertTrue( - isinstance( - self._boto_sqs_mock.send_message_batch, BoundFunctionWrapper - ) - ) - self.assertTrue( - isinstance( - self._boto_sqs_mock.receive_message, BoundFunctionWrapper - ) - ) - self.assertTrue( - isinstance( - self._boto_sqs_mock.delete_message, BoundFunctionWrapper - ) - ) - self.assertTrue( - isinstance( - self._boto_sqs_mock.delete_message_batch, BoundFunctionWrapper - ) - ) - instrumentation.uninstrument() + def test_instrument_multiple_clients(self): + with self._active_instrumentor(): + self._assert_instrumented(_make_sqs_client()) + self._assert_instrumented(_make_sqs_client()) class TestBoto3SQSGetter(TestCase): @@ -101,29 +94,17 @@ def test_get_none(self) -> None: def test_get_value(self) -> None: key = "test" value = "value" - carrier = { - f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}": { - "StringValue": value, - "DataType": "String", - } - } + carrier = {key: {"StringValue": value, "DataType": "String"}} val = self.getter.get(carrier, key) self.assertEqual(val, [value]) def test_keys(self): - key1 = "test1" - value1 = "value1" - key2 = "test2" - value2 = "value2" carrier = { - f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key1}": { - "StringValue": value1, - "DataType": "String", - }, - key2: {"StringValue": value2, "DataType": "String"}, + "test1": {"StringValue": "value1", "DataType": "String"}, + "test2": {"StringValue": "value2", "DataType": "String"}, } keys = self.getter.keys(carrier) - self.assertEqual(keys, [key1, key2]) + self.assertEqual(keys, list(carrier.keys())) def test_keys_empty(self): keys = self.getter.keys({}) @@ -145,8 +126,188 @@ def test_simple(self): for dict_key, dict_val in carrier[original_key].items(): self.assertEqual(original_value[dict_key], dict_val) # Ensure the new key is added well - self.assertIn( - f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys() + self.assertEqual(carrier[key]["StringValue"], value) + + +class TestBoto3SQSInstrumentation(TestBase): + def setUp(self): + super().setUp() + self._reset_instrumentor() + Boto3SQSInstrumentor().instrument() + + self._client = _make_sqs_client() + self._queue_name = "MyQueue" + self._queue_url = f"https://sqs.us-east-1.amazonaws.com/123456789012/{self._queue_name}" + + def tearDown(self): + super().tearDown() + Boto3SQSInstrumentor().uninstrument() + self._reset_instrumentor() + + @staticmethod + def _reset_instrumentor(): + Boto3SQSInstrumentor.received_messages_spans.clear() + Boto3SQSInstrumentor.current_span_related_to_token = None + Boto3SQSInstrumentor.current_context_token = None + + @staticmethod + def _make_aws_response_func(response): + def _response_func(*args, **kwargs): + return AWSResponse("http://127.0.0.1", 200, {}, "{}"), response + + return _response_func + + @contextmanager + def _mocked_endpoint(self, response): + response_func = self._make_aws_response_func(response) + with mock.patch( + "botocore.endpoint.Endpoint.make_request", new=response_func + ): + yield + + def _assert_injected_span(self, msg_attrs: Dict[str, Any], span: Span): + trace_parent = msg_attrs["traceparent"]["StringValue"] + ctx = span.get_span_context() + self.assertEqual( + self._to_trace_parent(ctx.trace_id, ctx.span_id), + trace_parent.lower(), ) - new_value = carrier[f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] - self.assertEqual(new_value["StringValue"], value) + + def _default_span_attrs(self): + return { + SpanAttributes.MESSAGING_SYSTEM: "aws.sqs", + SpanAttributes.MESSAGING_DESTINATION: self._queue_name, + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_URL: self._queue_url, + } + + @staticmethod + def _to_trace_parent(trace_id: int, span_id: int) -> str: + return f"00-{format_trace_id(trace_id)}-{format_span_id(span_id)}-01".lower() + + def _get_only_span(self): + spans = self.get_finished_spans() + self.assertEqual(1, len(spans)) + return spans[0] + + @staticmethod + def _make_message(message_id: str, body: str, receipt: str): + return { + "MessageId": message_id, + "ReceiptHandle": receipt, + "MD5OfBody": "777", + "Body": body, + "Attributes": {}, + "MD5OfMessageAttributes": "111", + "MessageAttributes": {}, + } + + def _add_trace_parent( + self, message: Dict[str, Any], trace_id: int, span_id: int + ): + message["MessageAttributes"]["traceparent"] = { + "StringValue": self._to_trace_parent(trace_id, span_id), + "DataType": "String", + } + + def test_send_message(self): + message_id = "123456789" + mock_response = { + "MD5OfMessageBody": "1234", + "MD5OfMessageAttributes": "5678", + "MD5OfMessageSystemAttributes": "9012", + "MessageId": message_id, + "SequenceNumber": "0", + } + + message_attrs = {} + + with self._mocked_endpoint(mock_response): + self._client.send_message( + QueueUrl=self._queue_url, + MessageBody="hello msg", + MessageAttributes=message_attrs, + ) + + span = self._get_only_span() + self.assertEqual(f"{self._queue_name} send", span.name) + self.assertEqual(SpanKind.PRODUCER, span.kind) + self.assertEqual( + { + SpanAttributes.MESSAGING_MESSAGE_ID: message_id, + **self._default_span_attrs(), + }, + span.attributes, + ) + self._assert_injected_span(message_attrs, span) + + def test_receive_message(self): + msg_def = { + "1": {"receipt": "01", "trace_id": 10, "span_id": 1}, + "2": {"receipt": "02", "trace_id": 20, "span_id": 2}, + } + + mock_response = {"Messages": []} + for msg_id, attrs in msg_def.items(): + message = self._make_message( + msg_id, f"hello {msg_id}", attrs["receipt"] + ) + self._add_trace_parent( + message, attrs["trace_id"], attrs["span_id"] + ) + mock_response["Messages"].append(message) + + message_attr_names = [] + + with self._mocked_endpoint(mock_response): + response = self._client.receive_message( + QueueUrl=self._queue_url, + MessageAttributeNames=message_attr_names, + ) + + self.assertIn("traceparent", message_attr_names) + + # receive span + span = self._get_only_span() + self.assertEqual(f"{self._queue_name} receive", span.name) + self.assertEqual(SpanKind.CONSUMER, span.kind) + self.assertEqual( + { + SpanAttributes.MESSAGING_OPERATION: MessagingOperationValues.RECEIVE.value, + **self._default_span_attrs(), + }, + span.attributes, + ) + + self.memory_exporter.clear() + + # processing spans + self.assertEqual(2, len(response["Messages"])) + for msg in response["Messages"]: + msg_id = msg["MessageId"] + attrs = msg_def[msg_id] + with self._mocked_endpoint(None): + self._client.delete_message( + QueueUrl=self._queue_url, ReceiptHandle=attrs["receipt"], + ) + + span = self._get_only_span() + self.assertEqual(f"{self._queue_name} process", span.name) + + # processing span attributes + self.assertEqual( + { + SpanAttributes.MESSAGING_MESSAGE_ID: msg_id, + SpanAttributes.MESSAGING_OPERATION: MessagingOperationValues.PROCESS.value, + **self._default_span_attrs(), + }, + span.attributes, + ) + + # processing span links + self.assertEqual(1, len(span.links)) + link = span.links[0] + self.assertEqual(attrs["trace_id"], link.context.trace_id) + self.assertEqual(attrs["span_id"], link.context.span_id) + + self.memory_exporter.clear() diff --git a/tox.ini b/tox.ini index 64210a8c0c..74d412b057 100644 --- a/tox.ini +++ b/tox.ini @@ -32,6 +32,10 @@ envlist = py3{6,7,8,9,10}-test-instrumentation-botocore pypy3-test-instrumentation-botocore + ; opentelemetry-instrumentation-boto3sqs + py3{6,7,8,9,10}-test-instrumentation-boto3sqs + pypy3-test-instrumentation-boto3sqs + ; opentelemetry-instrumentation-django ; Only officially supported Python versions are tested for each Django ; major release. Updated list can be found at: @@ -259,6 +263,7 @@ changedir = test-instrumentation-aws-lambda: instrumentation/opentelemetry-instrumentation-aws-lambda/tests test-instrumentation-boto: instrumentation/opentelemetry-instrumentation-boto/tests test-instrumentation-botocore: instrumentation/opentelemetry-instrumentation-botocore/tests + test-instrumentation-boto3sqs: instrumentation/opentelemetry-instrumentation-boto3sqs/tests test-instrumentation-celery: instrumentation/opentelemetry-instrumentation-celery/tests test-instrumentation-dbapi: instrumentation/opentelemetry-instrumentation-dbapi/tests test-instrumentation-django{1,2,3,4}: instrumentation/opentelemetry-instrumentation-django/tests @@ -328,6 +333,8 @@ commands_pre = boto: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-botocore[test] boto: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-boto[test] + boto3sqs: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-boto3sqs[test] + falcon{1,2,3}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-falcon[test] flask: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-flask[test] @@ -445,6 +452,7 @@ commands_pre = python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-dbapi[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-asgi[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-botocore[test] + python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-boto3sqs[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-django[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-starlette[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test] From 6bfe41e51dbad0526bbfebcdf1011a11008789fc Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Thu, 18 Aug 2022 08:19:22 +0200 Subject: [PATCH 2/4] changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9b4c7bba7..6f591fc7b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations and add 'messaging.url' span attribute + ([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234)) + ## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0-0.33b0) - 2022-08-08 - Adding multiple db connections support for django-instrumentation's sqlcommenter From 03f9e8e1f34e260f4011a926920b334951995814 Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Thu, 18 Aug 2022 08:49:03 +0200 Subject: [PATCH 3/4] fix linting issues --- .../src/opentelemetry/instrumentation/boto3sqs/__init__.py | 4 ++-- .../tests/test_boto3sqs_instrumentation.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index 450541354e..25ff96d5e5 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -264,7 +264,7 @@ def send_batch_wrapper(wrapped, instance, args, kwargs): for entry in entries: entry_id = entry["Id"] span = self._tracer.start_span( - name=f"{queue_name} send", kind=SpanKind.PRODUCER, + name=f"{queue_name} send", kind=SpanKind.PRODUCER ) ids_to_spans[entry_id] = span Boto3SQSInstrumentor._enrich_span( @@ -367,7 +367,7 @@ def delete_message_wrapper_batch(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) wrap_function_wrapper( - sqs_class, "delete_message_batch", delete_message_wrapper_batch, + sqs_class, "delete_message_batch", delete_message_wrapper_batch ) def _wrap_client_creation(self) -> None: diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py index 468095b53e..102e40693c 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py @@ -288,7 +288,7 @@ def test_receive_message(self): attrs = msg_def[msg_id] with self._mocked_endpoint(None): self._client.delete_message( - QueueUrl=self._queue_url, ReceiptHandle=attrs["receipt"], + QueueUrl=self._queue_url, ReceiptHandle=attrs["receipt"] ) span = self._get_only_span() From 453242e1b2d97bf02d90f9c47055de1c07d70b23 Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Mon, 22 Aug 2022 15:31:03 +0200 Subject: [PATCH 4/4] unset instrumented flag on uninstrument --- CHANGELOG.md | 2 +- .../src/opentelemetry/instrumentation/boto3sqs/__init__.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f591fc7b1..acd7011afa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations and add 'messaging.url' span attribute +- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies. ([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234)) ## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0-0.33b0) - 2022-08-08 diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index 25ff96d5e5..dacb931db6 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -415,6 +415,8 @@ def _un_decorate_sqs(sqs_class: type) -> None: unwrap(sqs_class, "delete_message") unwrap(sqs_class, "delete_message_batch") + setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False) + def _instrument(self, **kwargs: Dict[str, Any]) -> None: self._tracer_provider: Optional[TracerProvider] = kwargs.get( "tracer_provider"