From ee9d28a71ca825a7518ff052dc5bafd3aa5bf2db Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Thu, 16 Jul 2020 11:43:04 -0400 Subject: [PATCH] instrumentation/grpc: Testing for gRPC Client Interceptor (#896) --- ext/opentelemetry-ext-grpc/CHANGELOG.md | 3 + ext/opentelemetry-ext-grpc/setup.cfg | 1 + .../src/opentelemetry/ext/grpc/_client.py | 40 +++- ext/opentelemetry-ext-grpc/tests/_client.py | 57 +++++ ext/opentelemetry-ext-grpc/tests/_server.py | 87 +++++++ .../tests/protobuf/test_server.proto | 34 +++ .../tests/protobuf/test_server_pb2.py | 215 ++++++++++++++++++ .../tests/protobuf/test_server_pb2_grpc.py | 205 +++++++++++++++++ .../tests/test_client_interceptor.py | 139 +++++++++++ tox.ini | 2 +- 10 files changed, 775 insertions(+), 8 deletions(-) create mode 100644 ext/opentelemetry-ext-grpc/tests/_client.py create mode 100644 ext/opentelemetry-ext-grpc/tests/_server.py create mode 100644 ext/opentelemetry-ext-grpc/tests/protobuf/test_server.proto create mode 100644 ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2.py create mode 100644 ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2_grpc.py create mode 100644 ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py diff --git a/ext/opentelemetry-ext-grpc/CHANGELOG.md b/ext/opentelemetry-ext-grpc/CHANGELOG.md index 2302714790..3be32e05e5 100644 --- a/ext/opentelemetry-ext-grpc/CHANGELOG.md +++ b/ext/opentelemetry-ext-grpc/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Add status code to gRPC client spans + ([896](https://github.com/open-telemetry/opentelemetry-python/pull/896)) + ## 0.8b0 Released 2020-05-27 diff --git a/ext/opentelemetry-ext-grpc/setup.cfg b/ext/opentelemetry-ext-grpc/setup.cfg index 012b3541d2..2721a3b257 100644 --- a/ext/opentelemetry-ext-grpc/setup.cfg +++ b/ext/opentelemetry-ext-grpc/setup.cfg @@ -47,6 +47,7 @@ install_requires = test = opentelemetry-test == 0.11.dev0 opentelemetry-sdk == 0.11.dev0 + protobuf == 3.12.2 [options.packages.find] where = src diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py index ebf455910c..373d8f345c 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py @@ -25,6 +25,7 @@ import grpc from opentelemetry import propagators, trace +from opentelemetry.trace.status import Status, StatusCanonicalCode from . import grpcext from ._utilities import RpcInfo @@ -33,14 +34,16 @@ class _GuardedSpan: def __init__(self, span): self.span = span + self.generated_span = None self._engaged = True def __enter__(self): - self.span.__enter__() + self.generated_span = self.span.__enter__() return self def __exit__(self, *args, **kwargs): if self._engaged: + self.generated_span = None return self.span.__exit__(*args, **kwargs) return False @@ -122,7 +125,15 @@ def intercept_unary(self, request, metadata, client_info, invoker): timeout=client_info.timeout, request=request, ) - result = invoker(request, metadata) + + try: + result = invoker(request, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + return self._trace_result(guarded_span, rpc_info, result) # For RPCs that stream responses, the result can be a generator. To record @@ -136,7 +147,7 @@ def _intercept_server_stream( else: mutable_metadata = OrderedDict(metadata) - with self._start_span(client_info.full_method): + with self._start_span(client_info.full_method) as span: _inject_span_context(mutable_metadata) metadata = tuple(mutable_metadata.items()) rpc_info = RpcInfo( @@ -146,9 +157,16 @@ def _intercept_server_stream( ) if client_info.is_client_stream: rpc_info.request = request_or_iterator - result = invoker(request_or_iterator, metadata) - for response in result: - yield response + + try: + result = invoker(request_or_iterator, metadata) + for response in result: + yield response + except grpc.RpcError as exc: + span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise def intercept_stream( self, request_or_iterator, metadata, client_info, invoker @@ -172,5 +190,13 @@ def intercept_stream( timeout=client_info.timeout, request=request_or_iterator, ) - result = invoker(request_or_iterator, metadata) + + try: + result = invoker(request_or_iterator, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + return self._trace_result(guarded_span, rpc_info, result) diff --git a/ext/opentelemetry-ext-grpc/tests/_client.py b/ext/opentelemetry-ext-grpc/tests/_client.py new file mode 100644 index 0000000000..43310b5f65 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/_client.py @@ -0,0 +1,57 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .protobuf.test_server_pb2 import Request + +CLIENT_ID = 1 + + +def simple_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + stub.SimpleMethod(request) + + +def client_streaming_method(stub, error=False): + # create a generator + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + stub.ClientStreamingMethod(request_messages()) + + +def server_streaming_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + response_iterator = stub.ServerStreamingMethod(request) + list(response_iterator) + + +def bidirectional_streaming_method(stub, error=False): + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + response_iterator = stub.BidirectionalStreamingMethod(request_messages()) + + list(response_iterator) diff --git a/ext/opentelemetry-ext-grpc/tests/_server.py b/ext/opentelemetry-ext-grpc/tests/_server.py new file mode 100644 index 0000000000..a4e1c266b8 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/_server.py @@ -0,0 +1,87 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures + +import grpc + +from .protobuf import test_server_pb2, test_server_pb2_grpc + +SERVER_ID = 1 + + +class TestServer(test_server_pb2_grpc.GRPCTestServerServicer): + def SimpleMethod(self, request, context): + if request.request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return test_server_pb2.Response() + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + return response + + def ClientStreamingMethod(self, request_iterator, context): + data = list(request_iterator) + if data[0].request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return test_server_pb2.Response() + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + return response + + def ServerStreamingMethod(self, request, context): + if request.request_data == "error": + + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="server stream error", + ) + return test_server_pb2.Response() + + # create a generator + def response_messages(): + for _ in range(5): + response = test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + yield response + + return response_messages() + + def BidirectionalStreamingMethod(self, request_iterator, context): + data = list(request_iterator) + if data[0].request_data == "error": + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="bidirectional error", + ) + return + + for _ in range(5): + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + + +def create_test_server(port): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + + test_server_pb2_grpc.add_GRPCTestServerServicer_to_server( + TestServer(), server + ) + + server.add_insecure_port("localhost:{}".format(port)) + + return server diff --git a/ext/opentelemetry-ext-grpc/tests/protobuf/test_server.proto b/ext/opentelemetry-ext-grpc/tests/protobuf/test_server.proto new file mode 100644 index 0000000000..790a7675de --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/protobuf/test_server.proto @@ -0,0 +1,34 @@ +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +message Request { + int64 client_id = 1; + string request_data = 2; +} + +message Response { + int64 server_id = 1; + string response_data = 2; +} + +service GRPCTestServer { + rpc SimpleMethod (Request) returns (Response); + + rpc ClientStreamingMethod (stream Request) returns (Response); + + rpc ServerStreamingMethod (Request) returns (stream Response); + + rpc BidirectionalStreamingMethod (stream Request) returns (stream Response); +} diff --git a/ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2.py b/ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2.py new file mode 100644 index 0000000000..735206f850 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test_server.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor.FileDescriptor( + name="test_server.proto", + package="", + syntax="proto3", + serialized_options=None, + serialized_pb=b'\n\x11test_server.proto"2\n\x07Request\x12\x11\n\tclient_id\x18\x01 \x01(\x03\x12\x14\n\x0crequest_data\x18\x02 \x01(\t"4\n\x08Response\x12\x11\n\tserver_id\x18\x01 \x01(\x03\x12\x15\n\rresponse_data\x18\x02 \x01(\t2\xce\x01\n\x0eGRPCTestServer\x12#\n\x0cSimpleMethod\x12\x08.Request\x1a\t.Response\x12.\n\x15\x43lientStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x12.\n\x15ServerStreamingMethod\x12\x08.Request\x1a\t.Response0\x01\x12\x37\n\x1c\x42idirectionalStreamingMethod\x12\x08.Request\x1a\t.Response(\x01\x30\x01\x62\x06proto3', +) + + +_REQUEST = _descriptor.Descriptor( + name="Request", + full_name="Request", + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name="client_id", + full_name="Request.client_id", + index=0, + number=1, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + _descriptor.FieldDescriptor( + name="request_data", + full_name="Request.request_data", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=21, + serialized_end=71, +) + + +_RESPONSE = _descriptor.Descriptor( + name="Response", + full_name="Response", + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name="server_id", + full_name="Response.server_id", + index=0, + number=1, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + _descriptor.FieldDescriptor( + name="response_data", + full_name="Response.response_data", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=73, + serialized_end=125, +) + +DESCRIPTOR.message_types_by_name["Request"] = _REQUEST +DESCRIPTOR.message_types_by_name["Response"] = _RESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Request = _reflection.GeneratedProtocolMessageType( + "Request", + (_message.Message,), + { + "DESCRIPTOR": _REQUEST, + "__module__": "test_server_pb2" + # @@protoc_insertion_point(class_scope:Request) + }, +) +_sym_db.RegisterMessage(Request) + +Response = _reflection.GeneratedProtocolMessageType( + "Response", + (_message.Message,), + { + "DESCRIPTOR": _RESPONSE, + "__module__": "test_server_pb2" + # @@protoc_insertion_point(class_scope:Response) + }, +) +_sym_db.RegisterMessage(Response) + + +_GRPCTESTSERVER = _descriptor.ServiceDescriptor( + name="GRPCTestServer", + full_name="GRPCTestServer", + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=128, + serialized_end=334, + methods=[ + _descriptor.MethodDescriptor( + name="SimpleMethod", + full_name="GRPCTestServer.SimpleMethod", + index=0, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="ClientStreamingMethod", + full_name="GRPCTestServer.ClientStreamingMethod", + index=1, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="ServerStreamingMethod", + full_name="GRPCTestServer.ServerStreamingMethod", + index=2, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name="BidirectionalStreamingMethod", + full_name="GRPCTestServer.BidirectionalStreamingMethod", + index=3, + containing_service=None, + input_type=_REQUEST, + output_type=_RESPONSE, + serialized_options=None, + ), + ], +) +_sym_db.RegisterServiceDescriptor(_GRPCTESTSERVER) + +DESCRIPTOR.services_by_name["GRPCTestServer"] = _GRPCTESTSERVER + +# @@protoc_insertion_point(module_scope) diff --git a/ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2_grpc.py b/ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2_grpc.py new file mode 100644 index 0000000000..d0a6fd5184 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/protobuf/test_server_pb2_grpc.py @@ -0,0 +1,205 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +from tests.protobuf import test_server_pb2 as test__server__pb2 + + +class GRPCTestServerStub(object): + """Missing associated documentation comment in .proto file""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SimpleMethod = channel.unary_unary( + "/GRPCTestServer/SimpleMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.ClientStreamingMethod = channel.stream_unary( + "/GRPCTestServer/ClientStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.ServerStreamingMethod = channel.unary_stream( + "/GRPCTestServer/ServerStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + self.BidirectionalStreamingMethod = channel.stream_stream( + "/GRPCTestServer/BidirectionalStreamingMethod", + request_serializer=test__server__pb2.Request.SerializeToString, + response_deserializer=test__server__pb2.Response.FromString, + ) + + +class GRPCTestServerServicer(object): + """Missing associated documentation comment in .proto file""" + + def SimpleMethod(self, request, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ClientStreamingMethod(self, request_iterator, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ServerStreamingMethod(self, request, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def BidirectionalStreamingMethod(self, request_iterator, context): + """Missing associated documentation comment in .proto file""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_GRPCTestServerServicer_to_server(servicer, server): + rpc_method_handlers = { + "SimpleMethod": grpc.unary_unary_rpc_method_handler( + servicer.SimpleMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "ClientStreamingMethod": grpc.stream_unary_rpc_method_handler( + servicer.ClientStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "ServerStreamingMethod": grpc.unary_stream_rpc_method_handler( + servicer.ServerStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + "BidirectionalStreamingMethod": grpc.stream_stream_rpc_method_handler( + servicer.BidirectionalStreamingMethod, + request_deserializer=test__server__pb2.Request.FromString, + response_serializer=test__server__pb2.Response.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "GRPCTestServer", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class GRPCTestServer(object): + """Missing associated documentation comment in .proto file""" + + @staticmethod + def SimpleMethod( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/GRPCTestServer/SimpleMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def ClientStreamingMethod( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_unary( + request_iterator, + target, + "/GRPCTestServer/ClientStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def ServerStreamingMethod( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/GRPCTestServer/ServerStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def BidirectionalStreamingMethod( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, + target, + "/GRPCTestServer/BidirectionalStreamingMethod", + test__server__pb2.Request.SerializeToString, + test__server__pb2.Response.FromString, + options, + channel_credentials, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py b/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py new file mode 100644 index 0000000000..47dc9fa0bb --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py @@ -0,0 +1,139 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc + +import opentelemetry.ext.grpc +from opentelemetry import metrics, trace +from opentelemetry.ext.grpc import client_interceptor +from opentelemetry.ext.grpc.grpcext import intercept_channel +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.test.test_base import TestBase +from tests.protobuf import test_server_pb2_grpc + +from ._client import ( + bidirectional_streaming_method, + client_streaming_method, + server_streaming_method, + simple_method, +) +from ._server import create_test_server + + +class TestClientProto(TestBase): + def setUp(self): + super().setUp() + self.server = create_test_server(25565) + self.server.start() + meter = metrics.get_meter(__name__) + interceptor = client_interceptor() + self.channel = intercept_channel( + grpc.insecure_channel("localhost:25565"), interceptor + ) + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + self._controller = PushController( + meter, self.memory_metrics_exporter, 30 + ) + + def tearDown(self): + super().tearDown() + self.memory_metrics_exporter.clear() + self.server.stop(None) + + def test_unary_stream(self): + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + + def test_stream_unary(self): + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + + def test_stream_stream(self): + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + + def test_error_simple(self): + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_stream_unary(self): + with self.assertRaises(grpc.RpcError): + client_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_unary_stream(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) + + def test_error_stream_stream(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual( + span.status.canonical_code.value, + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) diff --git a/tox.ini b/tox.ini index ea3a5c2913..2c87f69246 100644 --- a/tox.ini +++ b/tox.ini @@ -142,7 +142,7 @@ envlist = pypy3-test-core-opentracing-shim ; opentelemetry-ext-grpc - py3{4,5,6,7,8}-test-instrumentation-grpc + py3{5,6,7,8}-test-instrumentation-grpc ; opentelemetry-ext-sqlalchemy py3{4,5,6,7,8}-test-instrumentation-sqlalchemy