Skip to content

Commit

Permalink
Fixes, changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams committed Jul 20, 2020
1 parent dc79e3a commit 105ec0d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
2 changes: 2 additions & 0 deletions ext/opentelemetry-ext-grpc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Add status code to gRPC client spans
([896](https://github.com/open-telemetry/opentelemetry-python/pull/896))

- Add metric recording (bytes in/out, errors, latency) to gRPC client

## 0.8b0

Released 2020-05-27
Expand Down
38 changes: 22 additions & 16 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def append_metadata(
propagators.inject(append_metadata, metadata)


def _make_future_done_callback(span, rpc_info):
def _make_future_done_callback(span, rpc_info, client_info, metrics_recorder):
def callback(response_future):
with span:
code = response_future.code()
Expand All @@ -72,6 +72,10 @@ def callback(response_future):
return
response = response_future.result()
rpc_info.response = response
if "ByteSize" in dir(response):
metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)

return callback

Expand All @@ -90,12 +94,17 @@ def _start_span(self, method):
)

# pylint:disable=no-self-use
def _trace_result(self, guarded_span, rpc_info, result):
def _trace_result(self, guarded_span, rpc_info, result, client_info):
# If the RPC is called asynchronously, release the guard and add a
# callback so that the span can be finished once the future is done.
if isinstance(result, grpc.Future):
result.add_done_callback(
_make_future_done_callback(guarded_span.release(), rpc_info)
_make_future_done_callback(
guarded_span.release(),
rpc_info,
client_info,
self._metrics_recorder,
)
)
return result
response = result
Expand All @@ -106,6 +115,11 @@ def _trace_result(self, guarded_span, rpc_info, result):
if isinstance(result, tuple):
response = result[0]
rpc_info.response = response

if "ByteSize" in dir(response):
self._metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)
return result

def _start_guarded_span(self, *args, **kwargs):
Expand Down Expand Up @@ -154,13 +168,9 @@ def intercept_unary(self, request, metadata, client_info, invoker):
)
raise

ret = self._trace_result(guarded_span, rpc_info, result)

if "ByteSize" in dir(rpc_info.response):
self._metrics_recorder.record_bytes_in(
rpc_info.response.ByteSize(), client_info.full_method
)
return ret
return self._trace_result(
guarded_span, rpc_info, result, client_info
)

# For RPCs that stream responses, the result can be a generator. To record
# the span across the generated responses and detect any errors, we wrap
Expand Down Expand Up @@ -253,10 +263,6 @@ def intercept_stream(
)
raise

ret = self._trace_result(guarded_span, rpc_info, result)

self._metrics_recorder.record_bytes_in(
rpc_info.response.ByteSize(), client_info.full_method
return self._trace_result(
guarded_span, rpc_info, result, client_info
)

return ret

0 comments on commit 105ec0d

Please sign in to comment.