Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing for gRPC Client Interceptor #896

Merged
merged 4 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ext/opentelemetry-ext-grpc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Add status code to gRPC client spans
([896](https://github.com/open-telemetry/opentelemetry-python/pull/896))

## 0.8b0

Released 2020-05-27
Expand Down
1 change: 1 addition & 0 deletions ext/opentelemetry-ext-grpc/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ install_requires =
test =
opentelemetry-test == 0.11.dev0
opentelemetry-sdk == 0.11.dev0
protobuf == 3.12.2

[options.packages.find]
where = src
40 changes: 33 additions & 7 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import grpc

from opentelemetry import propagators, trace
from opentelemetry.trace.status import Status, StatusCanonicalCode

from . import grpcext
from ._utilities import RpcInfo
Expand All @@ -33,14 +34,16 @@
class _GuardedSpan:
def __init__(self, span):
self.span = span
self.generated_span = None
self._engaged = True

def __enter__(self):
self.span.__enter__()
self.generated_span = self.span.__enter__()
return self

def __exit__(self, *args, **kwargs):
if self._engaged:
self.generated_span = None
return self.span.__exit__(*args, **kwargs)
return False

Expand Down Expand Up @@ -122,7 +125,15 @@ def intercept_unary(self, request, metadata, client_info, invoker):
timeout=client_info.timeout,
request=request,
)
result = invoker(request, metadata)

try:
result = invoker(request, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

return self._trace_result(guarded_span, rpc_info, result)

# For RPCs that stream responses, the result can be a generator. To record
Expand All @@ -136,7 +147,7 @@ def _intercept_server_stream(
else:
mutable_metadata = OrderedDict(metadata)

with self._start_span(client_info.full_method):
with self._start_span(client_info.full_method) as span:
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
Expand All @@ -146,9 +157,16 @@ def _intercept_server_stream(
)
if client_info.is_client_stream:
rpc_info.request = request_or_iterator
result = invoker(request_or_iterator, metadata)
for response in result:
yield response

try:
result = invoker(request_or_iterator, metadata)
for response in result:
yield response
except grpc.RpcError as exc:
span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
Expand All @@ -172,5 +190,13 @@ def intercept_stream(
timeout=client_info.timeout,
request=request_or_iterator,
)
result = invoker(request_or_iterator, metadata)

try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

return self._trace_result(guarded_span, rpc_info, result)
57 changes: 57 additions & 0 deletions ext/opentelemetry-ext-grpc/tests/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .protobuf.test_server_pb2 import Request

CLIENT_ID = 1


def simple_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
stub.SimpleMethod(request)


def client_streaming_method(stub, error=False):
# create a generator
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request

stub.ClientStreamingMethod(request_messages())


def server_streaming_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
response_iterator = stub.ServerStreamingMethod(request)
list(response_iterator)


def bidirectional_streaming_method(stub, error=False):
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request

response_iterator = stub.BidirectionalStreamingMethod(request_messages())

list(response_iterator)
87 changes: 87 additions & 0 deletions ext/opentelemetry-ext-grpc/tests/_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent import futures

import grpc

from .protobuf import test_server_pb2, test_server_pb2_grpc

SERVER_ID = 1


class TestServer(test_server_pb2_grpc.GRPCTestServerServicer):
def SimpleMethod(self, request, context):
if request.request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response

def ClientStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return test_server_pb2.Response()
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
return response

def ServerStreamingMethod(self, request, context):
if request.request_data == "error":

context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="server stream error",
)
return test_server_pb2.Response()

# create a generator
def response_messages():
for _ in range(5):
response = test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)
yield response

return response_messages()

def BidirectionalStreamingMethod(self, request_iterator, context):
data = list(request_iterator)
if data[0].request_data == "error":
context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="bidirectional error",
)
return

for _ in range(5):
yield test_server_pb2.Response(
server_id=SERVER_ID, response_data="data"
)


def create_test_server(port):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))

test_server_pb2_grpc.add_GRPCTestServerServicer_to_server(
TestServer(), server
)

server.add_insecure_port("localhost:{}".format(port))

return server
34 changes: 34 additions & 0 deletions ext/opentelemetry-ext-grpc/tests/protobuf/test_server.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";

message Request {
int64 client_id = 1;
string request_data = 2;
}

message Response {
int64 server_id = 1;
string response_data = 2;
}

service GRPCTestServer {
rpc SimpleMethod (Request) returns (Response);

rpc ClientStreamingMethod (stream Request) returns (Response);

rpc ServerStreamingMethod (Request) returns (stream Response);

rpc BidirectionalStreamingMethod (stream Request) returns (stream Response);
}
Loading