Skip to content

Commit

Permalink
mongo db - fix db statement capturing (#1512)
Browse files Browse the repository at this point in the history
  • Loading branch information
avzis authored Jan 24, 2023
1 parent e1a1bad commit 810d982
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 16 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1555](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1555))
- `opentelemetry-instrumentation-asgi` Fix keys() in class ASGIGetter to correctly fetch values from carrier headers.
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))

- mongo db - fix db statement capturing
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))

## Version 1.15.0/0.36b0 (2022-12-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
failed_hook (Callable) -
a function with extra user-defined logic to be performed after the query returns with a failed response
this function signature is: def failed_hook(span: Span, event: CommandFailedEvent) -> None
capture_statement (bool) - an optional value to enable capturing the database statement that is being executed
for example:
Expand Down Expand Up @@ -81,6 +82,9 @@ def failed_hook(span, event):
from opentelemetry import context
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pymongo.package import _instruments
from opentelemetry.instrumentation.pymongo.utils import (
COMMAND_TO_ATTRIBUTE_MAPPING,
)
from opentelemetry.instrumentation.pymongo.version import __version__
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
Expand All @@ -106,30 +110,29 @@ def __init__(
request_hook: RequestHookT = dummy_callback,
response_hook: ResponseHookT = dummy_callback,
failed_hook: FailedHookT = dummy_callback,
capture_statement: bool = False,
):
self._tracer = tracer
self._span_dict = {}
self.is_enabled = True
self.start_hook = request_hook
self.success_hook = response_hook
self.failed_hook = failed_hook
self.capture_statement = capture_statement

def started(self, event: monitoring.CommandStartedEvent):
"""Method to handle a pymongo CommandStartedEvent"""
if not self.is_enabled or context.get_value(
_SUPPRESS_INSTRUMENTATION_KEY
):
return
command = event.command.get(event.command_name, "")
name = event.database_name
name += "." + event.command_name
statement = event.command_name
if command:
statement += " " + str(command)
command_name = event.command_name
span_name = f"{event.database_name}.{command_name}"
statement = self._get_statement_by_command_name(command_name, event)
collection = event.command.get(event.command_name)

try:
span = self._tracer.start_span(name, kind=SpanKind.CLIENT)
span = self._tracer.start_span(span_name, kind=SpanKind.CLIENT)
if span.is_recording():
span.set_attribute(
SpanAttributes.DB_SYSTEM, DbSystemValues.MONGODB.value
Expand Down Expand Up @@ -196,6 +199,14 @@ def failed(self, event: monitoring.CommandFailedEvent):
def _pop_span(self, event):
return self._span_dict.pop(_get_span_dict_key(event), None)

def _get_statement_by_command_name(self, command_name, event):
statement = command_name
command_attribute = COMMAND_TO_ATTRIBUTE_MAPPING.get(command_name)
command = event.command.get(command_attribute)
if command and self.capture_statement:
statement += " " + str(command)
return statement


def _get_span_dict_key(event):
if event.connection_id is not None:
Expand Down Expand Up @@ -228,6 +239,7 @@ def _instrument(self, **kwargs):
request_hook = kwargs.get("request_hook", dummy_callback)
response_hook = kwargs.get("response_hook", dummy_callback)
failed_hook = kwargs.get("failed_hook", dummy_callback)
capture_statement = kwargs.get("capture_statement")
# Create and register a CommandTracer only the first time
if self._commandtracer_instance is None:
tracer = get_tracer(__name__, __version__, tracer_provider)
Expand All @@ -237,6 +249,7 @@ def _instrument(self, **kwargs):
request_hook=request_hook,
response_hook=response_hook,
failed_hook=failed_hook,
capture_statement=capture_statement,
)
monitoring.register(self._commandtracer_instance)
# If already created, just enable it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

COMMAND_TO_ATTRIBUTE_MAPPING = {
"insert": "documents",
"delete": "deletes",
"update": "updates",
"find": "filter",
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_started(self):
span.attributes[SpanAttributes.DB_NAME], "database_name"
)
self.assertEqual(
span.attributes[SpanAttributes.DB_STATEMENT], "command_name find"
span.attributes[SpanAttributes.DB_STATEMENT], "command_name"
)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], "test.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def setUp(self):
self.instrumentor = PymongoInstrumentor()
self.instrumentor.instrument()
self.instrumentor._commandtracer_instance._tracer = self._tracer
self.instrumentor._commandtracer_instance.capture_statement = True
client = MongoClient(
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
)
Expand All @@ -44,7 +45,7 @@ def tearDown(self):
self.instrumentor.uninstrument()
super().tearDown()

def validate_spans(self):
def validate_spans(self, expected_db_statement):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
Expand Down Expand Up @@ -72,34 +73,65 @@ def validate_spans(self):
pymongo_span.attributes[SpanAttributes.DB_MONGODB_COLLECTION],
MONGODB_COLLECTION_NAME,
)
self.assertEqual(
pymongo_span.attributes[SpanAttributes.DB_STATEMENT],
expected_db_statement,
)

def test_insert(self):
"""Should create a child span for insert"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.insert_one(
insert_result = self._collection.insert_one(
{"name": "testName", "value": "testValue"}
)
self.validate_spans()
insert_result_id = insert_result.inserted_id

expected_db_statement = (
f"insert [{{'name': 'testName', 'value': 'testValue', '_id': "
f"ObjectId('{insert_result_id}')}}]"
)
self.validate_spans(expected_db_statement)

def test_update(self):
"""Should create a child span for update"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.update_one(
{"name": "testName"}, {"$set": {"value": "someOtherValue"}}
)
self.validate_spans()

expected_db_statement = (
"update [SON([('q', {'name': 'testName'}), ('u', "
"{'$set': {'value': 'someOtherValue'}}), ('multi', False), ('upsert', False)])]"
)
self.validate_spans(expected_db_statement)

def test_find(self):
"""Should create a child span for find"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one()
self.validate_spans()
self._collection.find_one({"name": "testName"})

expected_db_statement = "find {'name': 'testName'}"
self.validate_spans(expected_db_statement)

def test_delete(self):
"""Should create a child span for delete"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.delete_one({"name": "testName"})
self.validate_spans()

expected_db_statement = (
"delete [SON([('q', {'name': 'testName'}), ('limit', 1)])]"
)
self.validate_spans(expected_db_statement)

def test_find_without_capture_statement(self):
"""Should create a child span for find"""
self.instrumentor._commandtracer_instance.capture_statement = False

with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one({"name": "testName"})

expected_db_statement = "find"
self.validate_spans(expected_db_statement)

def test_uninstrument(self):
# check that integration is working
Expand Down

0 comments on commit 810d982

Please sign in to comment.