Skip to content

Commit

Permalink
Instrument RedisCluster clients (#1177)
Browse files Browse the repository at this point in the history
* Instrument RedisCluster clients

* reformat files

* update changelogs

* refactor _traced_execute_pipeline

* handle AttributeError

* handle IndexError

* refactor _traced_execute_pipeline

* move hasattr check to _set_connection_attributes function

Co-authored-by: Srikanth Chekuri <[email protected]>
  • Loading branch information
sungwonh and srikanthccv authored Jul 7, 2022
1 parent ee40839 commit 8823655
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc2-0.32b0...HEAD)

### Added
- `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients
([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177))

## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ def response_hook(span, instance, response):
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
import redis.asyncio

_REDIS_CLUSTER_VERSION = (4, 1, 0)
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 0)


def _set_connection_attributes(span, conn):
if not span.is_recording():
if not span.is_recording() or not hasattr(conn, "connection_pool"):
return
for key, value in _extract_conn_attributes(
conn.connection_pool.connection_kwargs
Expand Down Expand Up @@ -159,10 +162,29 @@ def _traced_execute_command(func, instance, args, kwargs):
return response

def _traced_execute_pipeline(func, instance, args, kwargs):
cmds = [_format_command_args(c) for c, _ in instance.command_stack]
resource = "\n".join(cmds)
try:
command_stack = (
instance.command_stack
if hasattr(instance, "command_stack")
else instance._command_stack
)

span_name = " ".join([args[0] for args, _ in instance.command_stack])
cmds = [
_format_command_args(c.args if hasattr(c, "args") else c[0])
for c in command_stack
]
resource = "\n".join(cmds)

span_name = " ".join(
[
(c.args[0] if hasattr(c, "args") else c[0][0])
for c in command_stack
]
)
except (AttributeError, IndexError):
command_stack = []
resource = ""
span_name = ""

with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
Expand All @@ -171,7 +193,7 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
span.set_attribute(SpanAttributes.DB_STATEMENT, resource)
_set_connection_attributes(span, instance)
span.set_attribute(
"db.redis.pipeline_length", len(instance.command_stack)
"db.redis.pipeline_length", len(command_stack)
)
response = func(*args, **kwargs)
if callable(response_hook):
Expand All @@ -196,6 +218,17 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
)
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
wrap_function_wrapper(
"redis.asyncio",
Expand All @@ -212,6 +245,17 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.asyncio.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
)


class RedisInstrumentor(BaseInstrumentor):
Expand Down Expand Up @@ -258,8 +302,14 @@ def _uninstrument(self, **kwargs):
unwrap(redis.Redis, "pipeline")
unwrap(redis.client.Pipeline, "execute")
unwrap(redis.client.Pipeline, "immediate_execute_command")
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
unwrap(redis.cluster.RedisCluster, "execute_command")
unwrap(redis.cluster.ClusterPipeline, "execute")
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
unwrap(redis.asyncio.Redis, "execute_command")
unwrap(redis.asyncio.Redis, "pipeline")
unwrap(redis.asyncio.client.Pipeline, "execute")
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
unwrap(redis.asyncio.cluster.RedisCluster, "execute_command")
unwrap(redis.asyncio.cluster.ClusterPipeline, "execute")
11 changes: 11 additions & 0 deletions tests/opentelemetry-docker-tests/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ services:
image: redis:4.0-alpine
ports:
- "127.0.0.1:6379:6379"
otrediscluster:
image: grokzen/redis-cluster:6.2.0
environment:
- IP=0.0.0.0
ports:
- "127.0.0.1:7000:7000"
- "127.0.0.1:7001:7001"
- "127.0.0.1:7002:7002"
- "127.0.0.1:7003:7003"
- "127.0.0.1:7004:7004"
- "127.0.0.1:7005:7005"
otjaeger:
image: jaegertracing/all-in-one:1.8
environment:
Expand Down
137 changes: 137 additions & 0 deletions tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,72 @@ def test_parent(self):
self.assertEqual(child_span.name, "GET")


class TestRedisClusterInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.cluster.RedisCluster(
host="localhost", port=7000
)
self.redis_client.flushall()
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

def _check_span(self, span, name):
self.assertEqual(span.name, name)
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)

def test_basics(self):
self.assertIsNone(self.redis_client.get("cheese"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "GET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
)
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)

def test_pipeline_traced(self):
with self.redis_client.pipeline(transaction=False) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
pipeline.execute()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "SET RPUSH HGETALL")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)

def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")

with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(self.redis_client.get("cheese"))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]

# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_span_context())

self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")

self.assertEqual(child_span.name, "GET")


def async_call(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
Expand Down Expand Up @@ -238,6 +304,77 @@ def test_parent(self):
self.assertEqual(child_span.name, "GET")


class TestAsyncRedisClusterInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.asyncio.cluster.RedisCluster(
host="localhost", port=7000
)
async_call(self.redis_client.flushall())
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

def _check_span(self, span, name):
self.assertEqual(span.name, name)
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)

def test_basics(self):
self.assertIsNone(async_call(self.redis_client.get("cheese")))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "GET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
)
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)

def test_pipeline_traced(self):
async def pipeline_simple():
async with self.redis_client.pipeline(
transaction=False
) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
await pipeline.execute()

async_call(pipeline_simple())

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "SET RPUSH HGETALL")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)

def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")

with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(async_call(self.redis_client.get("cheese")))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]

# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_span_context())

self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")

self.assertEqual(child_span.name, "GET")


class TestRedisDBIndexInstrument(TestBase):
def setUp(self):
super().setUp()
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ deps =
psycopg2 ~= 2.8.4
aiopg >= 0.13.0, < 1.3.0
sqlalchemy ~= 1.4
redis ~= 4.2
redis ~= 4.3
celery[pytest] >= 4.0, < 6.0
protobuf~=3.13
requests==2.25.0
Expand Down

0 comments on commit 8823655

Please sign in to comment.