Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument RedisCluster clients #1177

Merged
merged 10 commits into from
Jul 7, 2022
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc2-0.32b0...HEAD)

### Added
- `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients
([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177))

## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def response_hook(span, instance, response):
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
import redis.asyncio

_REDIS_CLUSTER_VERSION = (4, 1, 0)
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 0)


def _set_connection_attributes(span, conn):
if not span.is_recording():
Expand Down Expand Up @@ -149,7 +152,8 @@ def _traced_execute_command(func, instance, args, kwargs):
) as span:
if span.is_recording():
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
_set_connection_attributes(span, instance)
if hasattr(instance, "connection_pool"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the hasattr check in the _set_connection_attributes function?

Copy link
Contributor Author

@sungwonh sungwonh Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved hasattr check to the _set_connection_attributes function.

_set_connection_attributes(span, instance)
span.set_attribute("db.redis.args_length", len(args))
if callable(request_hook):
request_hook(span, instance, args, kwargs)
Expand All @@ -159,19 +163,37 @@ def _traced_execute_command(func, instance, args, kwargs):
return response

def _traced_execute_pipeline(func, instance, args, kwargs):
cmds = [_format_command_args(c) for c, _ in instance.command_stack]
try:
command_stack = (
instance.command_stack
if hasattr(instance, "command_stack")
else instance._command_stack
)
except AttributeError:
command_stack = []

cmds = [
_format_command_args(c.args if hasattr(c, "args") else c[0])
for c in command_stack
]
resource = "\n".join(cmds)

span_name = " ".join([args[0] for args, _ in instance.command_stack])
span_name = " ".join(
[
(c.args[0] if hasattr(c, "args") else c[0][0])
for c in command_stack
]
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also suggest to safe guard this code for potential IndexErrors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to handle IndexError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to handle AttributeError and IndexError in one try block.


with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(SpanAttributes.DB_STATEMENT, resource)
_set_connection_attributes(span, instance)
if hasattr(instance, "connection_pool"):
_set_connection_attributes(span, instance)
span.set_attribute(
"db.redis.pipeline_length", len(instance.command_stack)
"db.redis.pipeline_length", len(command_stack)
)
response = func(*args, **kwargs)
if callable(response_hook):
Expand All @@ -196,6 +218,17 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
)
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
wrap_function_wrapper(
"redis.asyncio",
Expand All @@ -212,6 +245,17 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.asyncio.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
)


class RedisInstrumentor(BaseInstrumentor):
Expand Down Expand Up @@ -258,8 +302,14 @@ def _uninstrument(self, **kwargs):
unwrap(redis.Redis, "pipeline")
unwrap(redis.client.Pipeline, "execute")
unwrap(redis.client.Pipeline, "immediate_execute_command")
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
unwrap(redis.cluster.RedisCluster, "execute_command")
unwrap(redis.cluster.ClusterPipeline, "execute")
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
unwrap(redis.asyncio.Redis, "execute_command")
unwrap(redis.asyncio.Redis, "pipeline")
unwrap(redis.asyncio.client.Pipeline, "execute")
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
unwrap(redis.asyncio.cluster.RedisCluster, "execute_command")
unwrap(redis.asyncio.cluster.ClusterPipeline, "execute")
11 changes: 11 additions & 0 deletions tests/opentelemetry-docker-tests/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ services:
image: redis:4.0-alpine
ports:
- "127.0.0.1:6379:6379"
otrediscluster:
image: grokzen/redis-cluster:6.2.0
environment:
- IP=0.0.0.0
ports:
- "127.0.0.1:7000:7000"
- "127.0.0.1:7001:7001"
- "127.0.0.1:7002:7002"
- "127.0.0.1:7003:7003"
- "127.0.0.1:7004:7004"
- "127.0.0.1:7005:7005"
otjaeger:
image: jaegertracing/all-in-one:1.8
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,72 @@ def test_parent(self):
self.assertEqual(child_span.name, "GET")


class TestRedisClusterInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.cluster.RedisCluster(
host="localhost", port=7000
)
self.redis_client.flushall()
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

def _check_span(self, span, name):
self.assertEqual(span.name, name)
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)

def test_basics(self):
self.assertIsNone(self.redis_client.get("cheese"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "GET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
)
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)

def test_pipeline_traced(self):
with self.redis_client.pipeline(transaction=False) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
pipeline.execute()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "SET RPUSH HGETALL")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)

def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")

with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(self.redis_client.get("cheese"))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]

# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_span_context())

self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")

self.assertEqual(child_span.name, "GET")


def async_call(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
Expand Down Expand Up @@ -238,6 +304,77 @@ def test_parent(self):
self.assertEqual(child_span.name, "GET")


class TestAsyncRedisClusterInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.asyncio.cluster.RedisCluster(
host="localhost", port=7000
)
async_call(self.redis_client.flushall())
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

def _check_span(self, span, name):
self.assertEqual(span.name, name)
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)

def test_basics(self):
self.assertIsNone(async_call(self.redis_client.get("cheese")))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "GET")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
)
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)

def test_pipeline_traced(self):
async def pipeline_simple():
async with self.redis_client.pipeline(
transaction=False
) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
await pipeline.execute()

async_call(pipeline_simple())

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span, "SET RPUSH HGETALL")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)

def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")

with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(async_call(self.redis_client.get("cheese")))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]

# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_span_context())

self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")

self.assertEqual(child_span.name, "GET")


class TestRedisDBIndexInstrument(TestBase):
def setUp(self):
super().setUp()
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ deps =
psycopg2 ~= 2.8.4
aiopg >= 0.13.0, < 1.3.0
sqlalchemy ~= 1.4
redis ~= 4.2
redis ~= 4.3
celery[pytest] >= 4.0, < 6.0
protobuf~=3.13
requests==2.25.0
Expand Down