Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument RedisCluster clients #1177

Merged
merged 10 commits into from
Jul 7, 2022
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc2-0.32b0...HEAD)

### Added
- `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients
([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177))

## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ def response_hook(span, instance, response):
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
import redis.asyncio

_REDIS_CLUSTER_VERSION = (4, 1, 0)
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 0)


def _set_connection_attributes(span, conn):
if not span.is_recording():
if not span.is_recording() or not hasattr(conn, "connection_pool"):
return
for key, value in _extract_conn_attributes(
conn.connection_pool.connection_kwargs
Expand Down Expand Up @@ -159,10 +162,29 @@ def _traced_execute_command(func, instance, args, kwargs):
return response

def _traced_execute_pipeline(func, instance, args, kwargs):
cmds = [_format_command_args(c) for c, _ in instance.command_stack]
resource = "\n".join(cmds)
try:
command_stack = (
instance.command_stack
if hasattr(instance, "command_stack")
else instance._command_stack
)

span_name = " ".join([args[0] for args, _ in instance.command_stack])
cmds = [
_format_command_args(c.args if hasattr(c, "args") else c[0])
for c in command_stack
]
resource = "\n".join(cmds)

span_name = " ".join(
[
(c.args[0] if hasattr(c, "args") else c[0][0])
for c in command_stack
]
)
except (AttributeError, IndexError):
command_stack = []
resource = ""
span_name = ""

with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
Expand All @@ -171,7 +193,7 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
span.set_attribute(SpanAttributes.DB_STATEMENT, resource)
_set_connection_attributes(span, instance)
span.set_attribute(
"db.redis.pipeline_length", len(instance.command_stack)
"db.redis.pipeline_length", len(command_stack)
)
response = func(*args, **kwargs)
if callable(response_hook):
Expand All @@ -196,6 +218,17 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
)
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
wrap_function_wrapper(
"redis.asyncio",
Expand All @@ -212,6 +245,17 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.asyncio.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
)


class RedisInstrumentor(BaseInstrumentor):
Expand Down Expand Up @@ -258,8 +302,14 @@ def _uninstrument(self, **kwargs):
unwrap(redis.Redis, "pipeline")
unwrap(redis.client.Pipeline, "execute")
unwrap(redis.client.Pipeline, "immediate_execute_command")
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
unwrap(redis.cluster.RedisCluster, "execute_command")
unwrap(redis.cluster.ClusterPipeline, "execute")
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
unwrap(redis.asyncio.Redis, "execute_command")
unwrap(redis.asyncio.Redis, "pipeline")
unwrap(redis.asyncio.client.Pipeline, "execute")
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
unwrap(redis.asyncio.cluster.RedisCluster, "execute_command")
unwrap(redis.asyncio.cluster.ClusterPipeline, "execute")
11 changes: 11 additions & 0 deletions tests/opentelemetry-docker-tests/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ services:
image: redis:4.0-alpine
ports:
- "127.0.0.1:6379:6379"
otrediscluster:
image: grokzen/redis-cluster:6.2.0
environment:
- IP=0.0.0.0
ports:
- "127.0.0.1:7000:7000"
- "127.0.0.1:7001:7001"
- "127.0.0.1:7002:7002"
- "127.0.0.1:7003:7003"
- "127.0.0.1:7004:7004"
- "127.0.0.1:7005:7005"
otjaeger:
image: jaegertracing/all-in-one:1.8
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,72 @@ def test_parent(self):
self.assertEqual(child_span.name, "GET")


class TestRedisClusterInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.cluster.RedisCluster(
host="localhost", port=7000
)
self.redis_client.flushall()
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

def _check_span(self, span, name):
self.assertEqual(span.name, name)
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)

def test_basics(self):
self.assertIsNone(self.redis_client.get("cheese"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "GET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
)
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)

def test_pipeline_traced(self):
with self.redis_client.pipeline(transaction=False) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
pipeline.execute()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "SET RPUSH HGETALL")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)

def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")

with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(self.redis_client.get("cheese"))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]

# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_span_context())

self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")

self.assertEqual(child_span.name, "GET")


def async_call(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
Expand Down Expand Up @@ -238,6 +304,77 @@ def test_parent(self):
self.assertEqual(child_span.name, "GET")


class TestAsyncRedisClusterInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.asyncio.cluster.RedisCluster(
host="localhost", port=7000
)
async_call(self.redis_client.flushall())
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

def _check_span(self, span, name):
self.assertEqual(span.name, name)
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)

def test_basics(self):
self.assertIsNone(async_call(self.redis_client.get("cheese")))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "GET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
)
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)

def test_pipeline_traced(self):
async def pipeline_simple():
async with self.redis_client.pipeline(
transaction=False
) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
await pipeline.execute()

async_call(pipeline_simple())

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "SET RPUSH HGETALL")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)

def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")

with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(async_call(self.redis_client.get("cheese")))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]

# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_span_context())

self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")

self.assertEqual(child_span.name, "GET")


class TestRedisDBIndexInstrument(TestBase):
def setUp(self):
super().setUp()
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ deps =
psycopg2 ~= 2.8.4
aiopg >= 0.13.0, < 1.3.0
sqlalchemy ~= 1.4
redis ~= 4.2
redis ~= 4.3
celery[pytest] >= 4.0, < 6.0
protobuf~=3.13
requests==2.25.0
Expand Down