Skip to content

Commit

Permalink
Bugfix for open-telemetry#257 - properly stream responses
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Stella committed Dec 15, 2020
1 parent 187987d commit 837770f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ def serve():
# pylint:disable=unused-argument
# isort:skip

import logging
log = logging.getLogger(__name__)


class GrpcInstrumentorServer(BaseInstrumentor):
"""
Expand Down Expand Up @@ -184,30 +187,44 @@ class GrpcInstrumentorClient(BaseInstrumentor):
grpc_client_instrumentor = GrpcInstrumentorClient()
grpc.client_instrumentor.instrument()
Instrumetor arguments:
wrap_secure (bool): False to disable wrapping secure channels
wrap_insecure (bool): False to disable wrapping insecure channels
exporter: OpenTelemetry metrics exporter
interval (int): metrics export interval
"""

def _instrument(self, **kwargs):
exporter = kwargs.get("exporter", None)
interval = kwargs.get("interval", 30)
if kwargs.get("channel_type") == "secure":

# preserve the old argument
if "wrap_secure" not in kwargs and kwargs.get("channel_type", "") == "secure":
kwargs["wrap_secure"] = True
kwargs["wrap_insecure"] = False

if kwargs.get("wrap_secure", True):
log.info("wrapping secure channels")
_wrap(
"grpc",
"secure_channel",
partial(self.wrapper_fn, exporter, interval),
)

else:
if kwargs.get("wrap_insecure", True):
log.info("wrapping insecure channels")
_wrap(
"grpc",
"insecure_channel",
partial(self.wrapper_fn, exporter, interval),
)

def _uninstrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
if kwargs.get("wrap_secure", True):
unwrap(grpc, "secure_channel")

else:
#else:
if kwargs.get("wrap_insecure", True):
unwrap(grpc, "insecure_channel")

def wrapper_fn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ def intercept_service(self, continuation, handler_call_details):
def telemetry_wrapper(behavior, request_streaming, response_streaming):
def telemetry_interceptor(request_or_iterator, context):

# handle streaming responses specially
if response_streaming:
return self._intercept_server_stream(
behavior,
handler_call_details,
request_or_iterator,
context,
)

with self._set_remote_context(context):
with self._start_span(
handler_call_details, context
Expand All @@ -249,6 +258,7 @@ def telemetry_interceptor(request_or_iterator, context):
# And now we run the actual RPC.
try:
return behavior(request_or_iterator, context)

except Exception as error:
# Bare exceptions are likely to be gRPC aborts, which
# we handle in our context wrapper.
Expand All @@ -263,3 +273,23 @@ def telemetry_interceptor(request_or_iterator, context):
return _wrap_rpc_behavior(
continuation(handler_call_details), telemetry_wrapper
)

# Handle streaming responses separately - we have to do this
# to return a *new* generator or various upstream things
# get confused, or we'll lose the consistent trace
def _intercept_server_stream(
self, behavior, handler_call_details, request_or_iterator, context
):

with self._set_remote_context(context):
with self._start_span(handler_call_details, context) as span:
context = _OpenTelemetryServicerContext(context, span)

try:
yield from behavior(request_or_iterator, context)

except Exception as error:
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
span.record_exception(error)
raise error

0 comments on commit 837770f

Please sign in to comment.