diff --git a/CHANGELOG.md b/CHANGELOG.md index ba513aebfa..4331bb599c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572)) +- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization. + ([#1598](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1598)) - `opentelemetry-instrumentation-celery` Record exceptions as events on the span. ([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573)) - Add metric instrumentation for urllib diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index 63b1fb25b1..4fd6bc79e1 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -44,6 +44,7 @@ The instrument() method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider +sanitize_query (bool) - an optional query sanitization flag request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request this function signature is: def request_hook(span: Span, method: str, url: str, kwargs) @@ -96,6 +97,8 @@ def response_hook(span, response): from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import SpanKind, get_tracer +from .utils import sanitize_body + logger = getLogger(__name__) @@ -135,11 +138,16 @@ def _instrument(self, **kwargs): tracer = get_tracer(__name__, __version__, tracer_provider) request_hook = kwargs.get("request_hook") response_hook = kwargs.get("response_hook") + sanitize_query = kwargs.get("sanitize_query", False) _wrap( elasticsearch, "Transport.perform_request", _wrap_perform_request( - tracer, self._span_name_prefix, request_hook, response_hook + tracer, + sanitize_query, + self._span_name_prefix, + request_hook, + response_hook, ), ) @@ -154,7 +162,11 @@ def _uninstrument(self, **kwargs): def _wrap_perform_request( - tracer, span_name_prefix, request_hook=None, response_hook=None + tracer, + sanitize_query, + span_name_prefix, + request_hook=None, + response_hook=None, ): # pylint: disable=R0912,R0914 def wrapper(wrapped, _, args, kwargs): @@ -213,7 +225,10 @@ def wrapper(wrapped, _, args, kwargs): if method: attributes["elasticsearch.method"] = method if body: - attributes[SpanAttributes.DB_STATEMENT] = str(body) + statement = str(body) + if sanitize_query: + statement = sanitize_body(body) + attributes[SpanAttributes.DB_STATEMENT] = statement if params: attributes["elasticsearch.params"] = str(params) if doc_id: diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py new file mode 100644 index 0000000000..7c5d753b31 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py @@ -0,0 +1,59 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +sanitized_keys = ( + "message", + "should", + "filter", + "query", + "queries", + "intervals", + "match", +) +sanitized_value = "?" + + +# pylint: disable=C0103 +def _flatten_dict(d, parent_key=""): + items = [] + for k, v in d.items(): + new_key = parent_key + "." + k if parent_key else k + if isinstance(v, dict): + items.extend(_flatten_dict(v, new_key).items()) + else: + items.append((new_key, v)) + return dict(items) + + +def _unflatten_dict(d): + res = {} + for k, v in d.items(): + keys = k.split(".") + d = res + for key in keys[:-1]: + if key not in d: + d[key] = {} + d = d[key] + d[keys[-1]] = v + return res + + +def sanitize_body(body) -> str: + flatten_body = _flatten_dict(body) + + for key in flatten_body: + if key.endswith(sanitized_keys): + flatten_body[key] = sanitized_value + + return str(_unflatten_dict(flatten_body)) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/sanitization_queries.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/sanitization_queries.py new file mode 100644 index 0000000000..234e24433e --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/sanitization_queries.py @@ -0,0 +1,65 @@ +interval_query = { + "query": { + "intervals": { + "my_text": { + "all_of": { + "ordered": True, + "intervals": [ + { + "match": { + "query": "my favorite food", + "max_gaps": 0, + "ordered": True, + } + }, + { + "any_of": { + "intervals": [ + {"match": {"query": "hot water"}}, + {"match": {"query": "cold porridge"}}, + ] + } + }, + ], + } + } + } + } +} + +match_query = {"query": {"match": {"message": {"query": "this is a test"}}}} + +filter_query = { + "query": { + "bool": { + "must": [ + {"match": {"title": "Search"}}, + {"match": {"content": "Elasticsearch"}}, + ], + "filter": [ + {"term": {"status": "published"}}, + {"range": {"publish_date": {"gte": "2015-01-01"}}}, + ], + } + } +} + +interval_query_sanitized = { + "query": { + "intervals": { + "my_text": {"all_of": {"ordered": True, "intervals": "?"}} + } + } +} +match_query_sanitized = {"query": {"match": {"message": {"query": "?"}}}} +filter_query_sanitized = { + "query": { + "bool": { + "must": [ + {"match": {"title": "Search"}}, + {"match": {"content": "Elasticsearch"}}, + ], + "filter": "?", + } + } +} diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index c327f90474..f7168b39d2 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json import os import threading @@ -27,10 +28,13 @@ from opentelemetry.instrumentation.elasticsearch import ( ElasticsearchInstrumentor, ) +from opentelemetry.instrumentation.elasticsearch.utils import sanitize_body from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase from opentelemetry.trace import StatusCode +from . import sanitization_queries # pylint: disable=no-name-in-module + major_version = elasticsearch.VERSION[0] if major_version == 7: @@ -42,7 +46,6 @@ else: from . import helpers_es2 as helpers # pylint: disable=no-name-in-module - Article = helpers.Article @@ -50,6 +53,22 @@ "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request" ) class TestElasticsearchIntegration(TestBase): + search_attributes = { + SpanAttributes.DB_SYSTEM: "elasticsearch", + "elasticsearch.url": "/test-index/_search", + "elasticsearch.method": helpers.dsl_search_method, + "elasticsearch.target": "test-index", + SpanAttributes.DB_STATEMENT: str( + {"query": {"bool": {"filter": [{"term": {"author": "testing"}}]}}} + ), + } + + create_attributes = { + SpanAttributes.DB_SYSTEM: "elasticsearch", + "elasticsearch.url": "/test-index", + "elasticsearch.method": "HEAD", + } + def setUp(self): super().setUp() self.tracer = self.tracer_provider.get_tracer(__name__) @@ -241,21 +260,36 @@ def test_dsl_search(self, request_mock): self.assertIsNotNone(span.end_time) self.assertEqual( span.attributes, + self.search_attributes, + ) + + def test_dsl_search_sanitized(self, request_mock): + # Reset instrumentation to use sanitized query (default) + ElasticsearchInstrumentor().uninstrument() + ElasticsearchInstrumentor().instrument(sanitize_query=True) + + # update expected attributes to match sanitized query + sanitized_search_attributes = self.search_attributes.copy() + sanitized_search_attributes.update( { - SpanAttributes.DB_SYSTEM: "elasticsearch", - "elasticsearch.url": "/test-index/_search", - "elasticsearch.method": helpers.dsl_search_method, - "elasticsearch.target": "test-index", - SpanAttributes.DB_STATEMENT: str( - { - "query": { - "bool": { - "filter": [{"term": {"author": "testing"}}] - } - } - } - ), - }, + SpanAttributes.DB_STATEMENT: "{'query': {'bool': {'filter': '?'}}}" + } + ) + + request_mock.return_value = (1, {}, '{"hits": {"hits": []}}') + client = Elasticsearch() + search = Search(using=client, index="test-index").filter( + "term", author="testing" + ) + search.execute() + spans = self.get_finished_spans() + span = spans[0] + self.assertEqual(1, len(spans)) + self.assertEqual(span.name, "Elasticsearch//_search") + self.assertIsNotNone(span.end_time) + self.assertEqual( + span.attributes, + sanitized_search_attributes, ) def test_dsl_create(self, request_mock): @@ -264,17 +298,14 @@ def test_dsl_create(self, request_mock): Article.init(using=client) spans = self.get_finished_spans() + assert spans self.assertEqual(2, len(spans)) span1 = spans.by_attr(key="elasticsearch.method", value="HEAD") span2 = spans.by_attr(key="elasticsearch.method", value="PUT") self.assertEqual( span1.attributes, - { - SpanAttributes.DB_SYSTEM: "elasticsearch", - "elasticsearch.url": "/test-index", - "elasticsearch.method": "HEAD", - }, + self.create_attributes, ) attributes = { @@ -288,6 +319,25 @@ def test_dsl_create(self, request_mock): helpers.dsl_create_statement, ) + def test_dsl_create_sanitized(self, request_mock): + # Reset instrumentation to explicitly use sanitized query + ElasticsearchInstrumentor().uninstrument() + ElasticsearchInstrumentor().instrument(sanitize_query=True) + request_mock.return_value = (1, {}, {}) + client = Elasticsearch() + Article.init(using=client) + + spans = self.get_finished_spans() + assert spans + + self.assertEqual(2, len(spans)) + span = spans.by_attr(key="elasticsearch.method", value="HEAD") + + self.assertEqual( + span.attributes, + self.create_attributes, + ) + def test_dsl_index(self, request_mock): request_mock.return_value = helpers.dsl_index_result @@ -412,3 +462,17 @@ def response_hook(span, response): json.dumps(response_payload), spans[0].attributes[response_attribute_name], ) + + def test_body_sanitization(self, _): + self.assertEqual( + sanitize_body(sanitization_queries.interval_query), + str(sanitization_queries.interval_query_sanitized), + ) + self.assertEqual( + sanitize_body(sanitization_queries.match_query), + str(sanitization_queries.match_query_sanitized), + ) + self.assertEqual( + sanitize_body(sanitization_queries.filter_query), + str(sanitization_queries.filter_query_sanitized), + )