From 7ffbfc302e45e333d491a69f441571047b43b552 Mon Sep 17 00:00:00 2001 From: Shalev Roda <65566801+shalevr@users.noreply.github.com> Date: Sun, 26 Feb 2023 16:51:42 +0200 Subject: [PATCH] Add metrics instrumentation sqlalchemy (#1645) --- CHANGELOG.md | 3 + instrumentation/README.md | 2 +- .../instrumentation/sqlalchemy/__init__.py | 33 +++- .../instrumentation/sqlalchemy/engine.py | 86 +++++++--- .../instrumentation/sqlalchemy/package.py | 4 +- .../tests/test_sqlalchemy_metrics.py | 159 ++++++++++++++++++ 6 files changed, 250 insertions(+), 37 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy_metrics.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ccb1a6626f..0537ee147e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add metrics instrumentation for sqlalchemy + ([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645)) + - Fix exception in Urllib3 when dealing with filelike body. ([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399)) diff --git a/instrumentation/README.md b/instrumentation/README.md index aa71744761..fd3a0258cf 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -34,7 +34,7 @@ | [opentelemetry-instrumentation-remoulade](./opentelemetry-instrumentation-remoulade) | remoulade >= 0.50 | No | [opentelemetry-instrumentation-requests](./opentelemetry-instrumentation-requests) | requests ~= 2.0 | Yes | [opentelemetry-instrumentation-sklearn](./opentelemetry-instrumentation-sklearn) | scikit-learn ~= 0.24.0 | No -| [opentelemetry-instrumentation-sqlalchemy](./opentelemetry-instrumentation-sqlalchemy) | sqlalchemy | No +| [opentelemetry-instrumentation-sqlalchemy](./opentelemetry-instrumentation-sqlalchemy) | sqlalchemy | Yes | [opentelemetry-instrumentation-sqlite3](./opentelemetry-instrumentation-sqlite3) | sqlite3 | No | [opentelemetry-instrumentation-starlette](./opentelemetry-instrumentation-starlette) | starlette ~= 0.13.0 | Yes | [opentelemetry-instrumentation-system-metrics](./opentelemetry-instrumentation-system-metrics) | psutil >= 5 | No diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py index b19de5ec96..77db23b417 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py @@ -105,13 +105,16 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.sqlalchemy.engine import ( EngineTracer, - _get_tracer, _wrap_connect, _wrap_create_async_engine, _wrap_create_engine, ) from opentelemetry.instrumentation.sqlalchemy.package import _instruments +from opentelemetry.instrumentation.sqlalchemy.version import __version__ from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.metrics import get_meter +from opentelemetry.semconv.metrics import MetricInstruments +from opentelemetry.trace import get_tracer class SQLAlchemyInstrumentor(BaseInstrumentor): @@ -136,32 +139,47 @@ def _instrument(self, **kwargs): An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise. """ tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, __version__, tracer_provider) + + meter_provider = kwargs.get("meter_provider") + meter = get_meter(__name__, __version__, meter_provider) + + connections_usage = meter.create_up_down_counter( + name=MetricInstruments.DB_CLIENT_CONNECTIONS_USAGE, + unit="connections", + description="The number of connections that are currently in state described by the state attribute.", + ) + enable_commenter = kwargs.get("enable_commenter", False) + _w( "sqlalchemy", "create_engine", - _wrap_create_engine(tracer_provider, enable_commenter), + _wrap_create_engine(tracer, connections_usage, enable_commenter), ) _w( "sqlalchemy.engine", "create_engine", - _wrap_create_engine(tracer_provider, enable_commenter), + _wrap_create_engine(tracer, connections_usage, enable_commenter), ) _w( "sqlalchemy.engine.base", "Engine.connect", - _wrap_connect(tracer_provider), + _wrap_connect(tracer), ) if parse_version(sqlalchemy.__version__).release >= (1, 4): _w( "sqlalchemy.ext.asyncio", "create_async_engine", - _wrap_create_async_engine(tracer_provider, enable_commenter), + _wrap_create_async_engine( + tracer, connections_usage, enable_commenter + ), ) if kwargs.get("engine") is not None: return EngineTracer( - _get_tracer(tracer_provider), + tracer, kwargs.get("engine"), + connections_usage, kwargs.get("enable_commenter", False), kwargs.get("commenter_options", {}), ) @@ -170,8 +188,9 @@ def _instrument(self, **kwargs): ): return [ EngineTracer( - _get_tracer(tracer_provider), + tracer, engine, + connections_usage, kwargs.get("enable_commenter", False), kwargs.get("commenter_options", {}), ) diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py index 2afbd7a82f..ca691fc052 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py @@ -20,9 +20,6 @@ ) from opentelemetry import trace -from opentelemetry.instrumentation.sqlalchemy.package import ( - _instrumenting_module_name, -) from opentelemetry.instrumentation.sqlalchemy.version import __version__ from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment from opentelemetry.instrumentation.utils import _get_opentelemetry_values @@ -44,15 +41,9 @@ def _normalize_vendor(vendor): return vendor -def _get_tracer(tracer_provider=None): - return trace.get_tracer( - _instrumenting_module_name, - __version__, - tracer_provider=tracer_provider, - ) - - -def _wrap_create_async_engine(tracer_provider=None, enable_commenter=False): +def _wrap_create_async_engine( + tracer, connections_usage, enable_commenter=False +): # pylint: disable=unused-argument def _wrap_create_async_engine_internal(func, module, args, kwargs): """Trace the SQLAlchemy engine, creating an `EngineTracer` @@ -60,33 +51,26 @@ def _wrap_create_async_engine_internal(func, module, args, kwargs): """ engine = func(*args, **kwargs) EngineTracer( - _get_tracer(tracer_provider), engine.sync_engine, enable_commenter + tracer, engine.sync_engine, connections_usage, enable_commenter ) return engine return _wrap_create_async_engine_internal -def _wrap_create_engine(tracer_provider=None, enable_commenter=False): - # pylint: disable=unused-argument - def _wrap_create_engine_internal(func, module, args, kwargs): +def _wrap_create_engine(tracer, connections_usage, enable_commenter=False): + def _wrap_create_engine_internal(func, _module, args, kwargs): """Trace the SQLAlchemy engine, creating an `EngineTracer` object that will listen to SQLAlchemy events. """ engine = func(*args, **kwargs) - EngineTracer(_get_tracer(tracer_provider), engine, enable_commenter) + EngineTracer(tracer, engine, connections_usage, enable_commenter) return engine return _wrap_create_engine_internal -def _wrap_connect(tracer_provider=None): - tracer = trace.get_tracer( - _instrumenting_module_name, - __version__, - tracer_provider=tracer_provider, - ) - +def _wrap_connect(tracer): # pylint: disable=unused-argument def _wrap_connect_internal(func, module, args, kwargs): with tracer.start_as_current_span( @@ -107,10 +91,16 @@ class EngineTracer: _remove_event_listener_params = [] def __init__( - self, tracer, engine, enable_commenter=False, commenter_options=None + self, + tracer, + engine, + connections_usage, + enable_commenter=False, + commenter_options=None, ): self.tracer = tracer self.engine = engine + self.connections_usage = connections_usage self.vendor = _normalize_vendor(engine.name) self.enable_commenter = enable_commenter self.commenter_options = commenter_options if commenter_options else {} @@ -123,6 +113,49 @@ def __init__( engine, "after_cursor_execute", _after_cur_exec ) self._register_event_listener(engine, "handle_error", _handle_error) + self._register_event_listener(engine, "connect", self._pool_connect) + self._register_event_listener(engine, "close", self._pool_close) + self._register_event_listener(engine, "checkin", self._pool_checkin) + self._register_event_listener(engine, "checkout", self._pool_checkout) + + def _get_pool_name(self): + return self.engine.pool.logging_name or "" + + def _add_idle_to_connection_usage(self, value): + self.connections_usage.add( + value, + attributes={ + "pool.name": self._get_pool_name(), + "state": "idle", + }, + ) + + def _add_used_to_connection_usage(self, value): + self.connections_usage.add( + value, + attributes={ + "pool.name": self._get_pool_name(), + "state": "used", + }, + ) + + def _pool_connect(self, _dbapi_connection, _connection_record): + self._add_idle_to_connection_usage(1) + + def _pool_close(self, _dbapi_connection, _connection_record): + self._add_idle_to_connection_usage(-1) + + # Called when a connection returns to the pool. + def _pool_checkin(self, _dbapi_connection, _connection_record): + self._add_used_to_connection_usage(-1) + self._add_idle_to_connection_usage(1) + + # Called when a connection is retrieved from the Pool. + def _pool_checkout( + self, _dbapi_connection, _connection_record, _connection_proxy + ): + self._add_idle_to_connection_usage(-1) + self._add_used_to_connection_usage(1) @classmethod def _register_event_listener(cls, target, identifier, func, *args, **kw): @@ -153,9 +186,8 @@ def _operation_name(self, db_name, statement): return self.vendor return " ".join(parts) - # pylint: disable=unused-argument def _before_cur_exec( - self, conn, cursor, statement, params, context, executemany + self, conn, cursor, statement, params, context, _executemany ): attrs, found = _get_attributes_from_url(conn.engine.url) if not found: diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/package.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/package.py index f1f833287d..e3029f57b6 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/package.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/package.py @@ -12,6 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -_instrumenting_module_name = "opentelemetry.instrumentation.sqlalchemy" - _instruments = ("sqlalchemy",) + +_supports_metrics = True diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy_metrics.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy_metrics.py new file mode 100644 index 0000000000..39e45945f7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy_metrics.py @@ -0,0 +1,159 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy +from sqlalchemy.pool import QueuePool + +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor +from opentelemetry.test.test_base import TestBase + + +class TestSqlalchemyMetricsInstrumentation(TestBase): + def setUp(self): + super().setUp() + SQLAlchemyInstrumentor().instrument( + tracer_provider=self.tracer_provider, + ) + + def tearDown(self): + super().tearDown() + SQLAlchemyInstrumentor().uninstrument() + + def assert_pool_idle_used_expected(self, pool_name, idle, used): + metrics = self.get_sorted_metrics() + self.assertEqual(len(metrics), 1) + self.assert_metric_expected( + metrics[0], + [ + self.create_number_data_point( + value=idle, + attributes={"pool.name": pool_name, "state": "idle"}, + ), + self.create_number_data_point( + value=used, + attributes={"pool.name": pool_name, "state": "used"}, + ), + ], + ) + + def test_metrics_one_connection(self): + pool_name = "pool_test_name" + engine = sqlalchemy.create_engine( + "sqlite:///:memory:", + pool_size=5, + poolclass=QueuePool, + pool_logging_name=pool_name, + ) + + metrics = self.get_sorted_metrics() + self.assertEqual(len(metrics), 0) + + with engine.connect(): + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=0, used=1 + ) + + # After the connection is closed + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=1, used=0 + ) + + def test_metrics_without_pool_name(self): + pool_name = "" + engine = sqlalchemy.create_engine( + "sqlite:///:memory:", + pool_size=5, + poolclass=QueuePool, + ) + + metrics = self.get_sorted_metrics() + self.assertEqual(len(metrics), 0) + + with engine.connect(): + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=0, used=1 + ) + + # After the connection is closed + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=1, used=0 + ) + + def test_metrics_two_connections(self): + pool_name = "pool_test_name" + engine = sqlalchemy.create_engine( + "sqlite:///:memory:", + pool_size=5, + poolclass=QueuePool, + pool_logging_name=pool_name, + ) + + metrics = self.get_sorted_metrics() + self.assertEqual(len(metrics), 0) + + with engine.connect(): + with engine.connect(): + self.assert_pool_idle_used_expected(pool_name, idle=0, used=2) + + # After the first connection is closed + self.assert_pool_idle_used_expected(pool_name, idle=1, used=1) + + # After the two connections are closed + self.assert_pool_idle_used_expected(pool_name, idle=2, used=0) + + def test_metrics_connections(self): + pool_name = "pool_test_name" + engine = sqlalchemy.create_engine( + "sqlite:///:memory:", + pool_size=5, + poolclass=QueuePool, + pool_logging_name=pool_name, + ) + + metrics = self.get_sorted_metrics() + self.assertEqual(len(metrics), 0) + + with engine.connect(): + with engine.connect(): + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=0, used=2 + ) + + # After the first connection is closed + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=1, used=1 + ) + + # Resume from idle to used + with engine.connect(): + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=0, used=2 + ) + + # After the two connections are closed + self.assert_pool_idle_used_expected( + pool_name=pool_name, idle=2, used=0 + ) + + def test_metric_uninstrument(self): + SQLAlchemyInstrumentor().uninstrument() + engine = sqlalchemy.create_engine( + "sqlite:///:memory:", + poolclass=QueuePool, + ) + + engine.connect() + + metrics = self.get_sorted_metrics() + self.assertEqual(len(metrics), 0)