Skip to content

Commit

Permalink
instrumentation/grpc: Testing for gRPC Client Interceptor (#896)
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams authored Jul 16, 2020
1 parent 2a952b3 commit ee9d28a
Show file tree
Hide file tree
Showing 10 changed files with 775 additions and 8 deletions.
3 changes: 3 additions & 0 deletions ext/opentelemetry-ext-grpc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Add status code to gRPC client spans
([896](https://github.com/open-telemetry/opentelemetry-python/pull/896))

## 0.8b0

Released 2020-05-27
Expand Down
1 change: 1 addition & 0 deletions ext/opentelemetry-ext-grpc/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ install_requires =
test =
opentelemetry-test == 0.11.dev0
opentelemetry-sdk == 0.11.dev0
protobuf == 3.12.2

[options.packages.find]
where = src
40 changes: 33 additions & 7 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import grpc

from opentelemetry import propagators, trace
from opentelemetry.trace.status import Status, StatusCanonicalCode

from . import grpcext
from ._utilities import RpcInfo
Expand All @@ -33,14 +34,16 @@
class _GuardedSpan:
def __init__(self, span):
self.span = span
self.generated_span = None
self._engaged = True

def __enter__(self):
self.span.__enter__()
self.generated_span = self.span.__enter__()
return self

def __exit__(self, *args, **kwargs):
if self._engaged:
self.generated_span = None
return self.span.__exit__(*args, **kwargs)
return False

Expand Down Expand Up @@ -122,7 +125,15 @@ def intercept_unary(self, request, metadata, client_info, invoker):
timeout=client_info.timeout,
request=request,
)
result = invoker(request, metadata)

try:
result = invoker(request, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

return self._trace_result(guarded_span, rpc_info, result)

# For RPCs that stream responses, the result can be a generator. To record
Expand All @@ -136,7 +147,7 @@ def _intercept_server_stream(
else:
mutable_metadata = OrderedDict(metadata)

with self._start_span(client_info.full_method):
with self._start_span(client_info.full_method) as span:
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
Expand All @@ -146,9 +157,16 @@ def _intercept_server_stream(
)
if client_info.is_client_stream:
rpc_info.request = request_or_iterator
result = invoker(request_or_iterator, metadata)
for response in result:
yield response

try:
result = invoker(request_or_iterator, metadata)
for response in result:
yield response
except grpc.RpcError as exc:
span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
Expand All @@ -172,5 +190,13 @@ def intercept_stream(
timeout=client_info.timeout,
request=request_or_iterator,
)
result = invoker(request_or_iterator, metadata)

try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

return self._trace_result(guarded_span, rpc_info, result)
57 changes: 57 additions & 0 deletions ext/opentelemetry-ext-grpc/tests/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .protobuf.test_server_pb2 import Request

CLIENT_ID = 1


def simple_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
stub.SimpleMethod(request)


def client_streaming_method(stub, error=False):
# create a generator
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request

stub.ClientStreamingMethod(request_messages())


def server_streaming_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
response_iterator = stub.ServerStreamingMethod(request)
list(response_iterator)


def bidirectional_streaming_method(stub, error=False):
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request

response_iterator = stub.BidirectionalStreamingMethod(request_messages())

list(response_iterator)
87 changes: 87 additions & 0 deletions ext/opentelemetry-ext-grpc/tests/_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent import futures

import grpc

from .protobuf import test_server_pb2, test_server_pb2_grpc

SERVER_ID = 1


class TestServer(test_server_pb2_grpc.GRPCTestServerServicer):
def SimpleMethod(self, request, context):
if request.request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response

def ClientStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response

def ServerStreamingMethod(self, request, context):
if request.request_data == "error":

context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="server stream error",
)
return test_server_pb2.Response()

# create a generator
def response_messages():
for _ in range(5):
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
yield response

return response_messages()

def BidirectionalStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="bidirectional error",
)
return

for _ in range(5):
yield test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)


def create_test_server(port):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))

test_server_pb2_grpc.add_GRPCTestServerServicer_to_server(
TestServer(), server
)

server.add_insecure_port("localhost:{}".format(port))

return server
34 changes: 34 additions & 0 deletions ext/opentelemetry-ext-grpc/tests/protobuf/test_server.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";

message Request {
int64 client_id = 1;
string request_data = 2;
}

message Response {
int64 server_id = 1;
string response_data = 2;
}

service GRPCTestServer {
rpc SimpleMethod (Request) returns (Response);

rpc ClientStreamingMethod (stream Request) returns (Response);

rpc ServerStreamingMethod (Request) returns (stream Response);

rpc BidirectionalStreamingMethod (stream Request) returns (stream Response);
}
Loading

0 comments on commit ee9d28a

Please sign in to comment.