Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boto3sqs: Make propagation compatible with other instrumentations and add 'messaging.url' span attribute #1234

Merged
merged 4 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234))

## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0-0.33b0) - 2022-08-08

- Adding multiple db connections support for django-instrumentation's sqlcommenter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ package_dir=
packages=find_namespace:
install_requires =
opentelemetry-api ~= 1.12
opentelemetry-semantic-conventions == 0.33b0
opentelemetry-instrumentation == 0.33b0
wrapt >= 1.0.0, < 2.0.0

[options.extras_require]
test =
opentelemetry-test-utils == 0.33b0

[options.packages.find]
where = src
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
Boto3SQSInstrumentor().instrument()
"""
import logging
from typing import Any, Collection, Dict, Generator, List, Optional
from typing import Any, Collection, Dict, Generator, List, Mapping, Optional

import boto3
import botocore.client
Expand All @@ -53,33 +53,31 @@
from .version import __version__

_logger = logging.getLogger(__name__)
# We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming
# existing filters
_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER: str = "otel."
_OTEL_IDENTIFIER_LENGTH = len(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER)

_IS_SQS_INSTRUMENTED_ATTRIBUTE = "_otel_boto3sqs_instrumented"


class Boto3SQSGetter(Getter[CarrierT]):
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
value = carrier.get(f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", {})
if not value:
msg_attr = carrier.get(key)
if not isinstance(msg_attr, Mapping):
return None

value = msg_attr.get("StringValue")
if value is None:
return None
return [value.get("StringValue")]

return [value]

def keys(self, carrier: CarrierT) -> List[str]:
return [
key[_OTEL_IDENTIFIER_LENGTH:]
if key.startswith(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER)
else key
for key in carrier.keys()
]
return list(carrier.keys())


class Boto3SQSSetter(Setter[CarrierT]):
def set(self, carrier: CarrierT, key: str, value: str) -> None:
# This is a limitation defined by AWS for SQS MessageAttributes size
if len(carrier.items()) < 10:
oxeye-nikolay marked this conversation as resolved.
Show resolved Hide resolved
carrier[f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] = {
carrier[key] = {
"StringValue": value,
"DataType": "String",
}
Expand Down Expand Up @@ -145,6 +143,7 @@ def instrumentation_dependencies(self) -> Collection[str]:
def _enrich_span(
span: Span,
queue_name: str,
queue_url: str,
conversation_id: Optional[str] = None,
operation: Optional[MessagingOperationValues] = None,
message_id: Optional[str] = None,
Expand All @@ -157,12 +156,12 @@ def _enrich_span(
SpanAttributes.MESSAGING_DESTINATION_KIND,
MessagingDestinationKindValues.QUEUE.value,
)
span.set_attribute(SpanAttributes.MESSAGING_URL, queue_url)

if operation:
span.set_attribute(
SpanAttributes.MESSAGING_OPERATION, operation.value
)
else:
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
if conversation_id:
span.set_attribute(
SpanAttributes.MESSAGING_CONVERSATION_ID, conversation_id
Expand Down Expand Up @@ -190,15 +189,19 @@ def _extract_queue_name_from_url(queue_url: str) -> str:
return queue_url.split("/")[-1]

def _create_processing_span(
self, queue_name: str, receipt_handle: str, message: Dict[str, Any]
self,
queue_name: str,
queue_url: str,
receipt_handle: str,
message: Dict[str, Any],
) -> None:
message_attributes = message.get("MessageAttributes", {})
links = []
ctx = propagate.extract(message_attributes, getter=boto3sqs_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))
parent_span_ctx = trace.get_current_span(ctx).get_span_context()
if parent_span_ctx.is_valid:
links.append(Link(context=parent_span_ctx))

span = self._tracer.start_span(
name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER
)
Expand All @@ -208,11 +211,12 @@ def _create_processing_span(
Boto3SQSInstrumentor._enrich_span(
span,
queue_name,
queue_url,
message_id=message_id,
operation=MessagingOperationValues.PROCESS,
)

def _wrap_send_message(self) -> None:
def _wrap_send_message(self, sqs_class: type) -> None:
def send_wrapper(wrapped, instance, args, kwargs):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)
Expand All @@ -227,7 +231,7 @@ def send_wrapper(wrapped, instance, args, kwargs):
kind=SpanKind.PRODUCER,
end_on_exit=True,
) as span:
Boto3SQSInstrumentor._enrich_span(span, queue_name)
Boto3SQSInstrumentor._enrich_span(span, queue_name, queue_url)
attributes = kwargs.pop("MessageAttributes", {})
propagate.inject(attributes, setter=boto3sqs_setter)
retval = wrapped(*args, MessageAttributes=attributes, **kwargs)
Expand All @@ -239,9 +243,9 @@ def send_wrapper(wrapped, instance, args, kwargs):
)
return retval

wrap_function_wrapper(self._sqs_class, "send_message", send_wrapper)
wrap_function_wrapper(sqs_class, "send_message", send_wrapper)

def _wrap_send_message_batch(self) -> None:
def _wrap_send_message_batch(self, sqs_class: type) -> None:
def send_batch_wrapper(wrapped, instance, args, kwargs):
queue_url = kwargs.get("QueueUrl")
entries = kwargs.get("Entries")
Expand All @@ -260,12 +264,11 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
for entry in entries:
entry_id = entry["Id"]
span = self._tracer.start_span(
name=f"{queue_name} send",
kind=SpanKind.PRODUCER,
name=f"{queue_name} send", kind=SpanKind.PRODUCER
)
ids_to_spans[entry_id] = span
Boto3SQSInstrumentor._enrich_span(
span, queue_name, conversation_id=entry_id
span, queue_name, queue_url, conversation_id=entry_id
)
with trace.use_span(span):
if "MessageAttributes" not in entry:
Expand All @@ -288,15 +291,15 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
return retval

wrap_function_wrapper(
self._sqs_class, "send_message_batch", send_batch_wrapper
sqs_class, "send_message_batch", send_batch_wrapper
)

def _wrap_receive_message(self) -> None:
def _wrap_receive_message(self, sqs_class: type) -> None:
def receive_message_wrapper(wrapped, instance, args, kwargs):
queue_url = kwargs.get("QueueUrl")
message_attribute_names = kwargs.pop("MessageAttributeNames", [])
message_attribute_names.append(
f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}*"
message_attribute_names.extend(
propagate.get_global_textmap().fields
)
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
queue_url
Expand All @@ -309,6 +312,7 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
Boto3SQSInstrumentor._enrich_span(
span,
queue_name,
queue_url,
operation=MessagingOperationValues.RECEIVE,
)
retval = wrapped(
Expand All @@ -327,29 +331,31 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
receipt_handle
)
self._create_processing_span(
queue_name, receipt_handle, message
queue_name, queue_url, receipt_handle, message
)
retval["Messages"] = Boto3SQSInstrumentor.ContextableList(
messages
)
return retval

wrap_function_wrapper(
self._sqs_class, "receive_message", receive_message_wrapper
sqs_class, "receive_message", receive_message_wrapper
)

def _wrap_delete_message(self) -> None:
@staticmethod
def _wrap_delete_message(sqs_class: type) -> None:
def delete_message_wrapper(wrapped, instance, args, kwargs):
receipt_handle = kwargs.get("ReceiptHandle")
if receipt_handle:
Boto3SQSInstrumentor._safe_end_processing_span(receipt_handle)
return wrapped(*args, **kwargs)

wrap_function_wrapper(
self._sqs_class, "delete_message", delete_message_wrapper
sqs_class, "delete_message", delete_message_wrapper
)

def _wrap_delete_message_batch(self) -> None:
@staticmethod
def _wrap_delete_message_batch(sqs_class: type) -> None:
def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
entries = kwargs.get("Entries")
for entry in entries:
Expand All @@ -361,9 +367,7 @@ def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
return wrapped(*args, **kwargs)

wrap_function_wrapper(
self._sqs_class,
"delete_message_batch",
delete_message_wrapper_batch,
sqs_class, "delete_message_batch", delete_message_wrapper_batch
)

def _wrap_client_creation(self) -> None:
Expand All @@ -375,52 +379,58 @@ def _wrap_client_creation(self) -> None:

def client_wrapper(wrapped, instance, args, kwargs):
retval = wrapped(*args, **kwargs)
if not self._did_decorate:
self._decorate_sqs()
self._decorate_sqs(type(retval))
return retval

wrap_function_wrapper(boto3, "client", client_wrapper)

def _decorate_sqs(self) -> None:
def _decorate_sqs(self, sqs_class: type) -> None:
"""
Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base
class and is SQS to wrap.
"""
# We define SQS client as the only client that implements send_message_batch
sqs_class = [
cls
for cls in botocore.client.BaseClient.__subclasses__()
if hasattr(cls, "send_message_batch")
]
if sqs_class:
self._sqs_class = sqs_class[0]
self._did_decorate = True
self._wrap_send_message()
self._wrap_send_message_batch()
self._wrap_receive_message()
self._wrap_delete_message()
self._wrap_delete_message_batch()

def _un_decorate_sqs(self) -> None:
if self._did_decorate:
unwrap(self._sqs_class, "send_message")
unwrap(self._sqs_class, "send_message_batch")
unwrap(self._sqs_class, "receive_message")
unwrap(self._sqs_class, "delete_message")
unwrap(self._sqs_class, "delete_message_batch")
self._did_decorate = False
if not hasattr(sqs_class, "send_message_batch"):
return

if getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False):
return
oxeye-nikolay marked this conversation as resolved.
Show resolved Hide resolved

setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, True)

self._wrap_send_message(sqs_class)
self._wrap_send_message_batch(sqs_class)
self._wrap_receive_message(sqs_class)
self._wrap_delete_message(sqs_class)
self._wrap_delete_message_batch(sqs_class)

@staticmethod
def _un_decorate_sqs(sqs_class: type) -> None:
if not getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False):
return

unwrap(sqs_class, "send_message")
unwrap(sqs_class, "send_message_batch")
unwrap(sqs_class, "receive_message")
unwrap(sqs_class, "delete_message")
unwrap(sqs_class, "delete_message_batch")

setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
self._did_decorate: bool = False
self._tracer_provider: Optional[TracerProvider] = kwargs.get(
"tracer_provider"
)
self._tracer: Tracer = trace.get_tracer(
__name__, __version__, self._tracer_provider
)
self._wrap_client_creation()
self._decorate_sqs()

for client_cls in botocore.client.BaseClient.__subclasses__():
self._decorate_sqs(client_cls)

def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
unwrap(boto3, "client")
self._un_decorate_sqs()

for client_cls in botocore.client.BaseClient.__subclasses__():
self._un_decorate_sqs(client_cls)
Loading