From 5ff9600a62f9c82fc120b03e5729a53341465626 Mon Sep 17 00:00:00 2001 From: Aravin <34178459+aravinsiva@users.noreply.github.com> Date: Wed, 22 Jul 2020 12:27:56 -0400 Subject: [PATCH] Adding gRPC instrumentor (#788) --- ext/opentelemetry-ext-grpc/CHANGELOG.md | 2 + ext/opentelemetry-ext-grpc/setup.cfg | 5 + .../src/opentelemetry/ext/grpc/__init__.py | 141 +++++++++++++++++- .../tests/test_server_interceptor.py | 58 ++++++- 4 files changed, 203 insertions(+), 3 deletions(-) diff --git a/ext/opentelemetry-ext-grpc/CHANGELOG.md b/ext/opentelemetry-ext-grpc/CHANGELOG.md index 3be32e05e5..4221ab5371 100644 --- a/ext/opentelemetry-ext-grpc/CHANGELOG.md +++ b/ext/opentelemetry-ext-grpc/CHANGELOG.md @@ -4,6 +4,8 @@ - Add status code to gRPC client spans ([896](https://github.com/open-telemetry/opentelemetry-python/pull/896)) +- Add gRPC client and server instrumentors + ([788](https://github.com/open-telemetry/opentelemetry-python/pull/788)) ## 0.8b0 diff --git a/ext/opentelemetry-ext-grpc/setup.cfg b/ext/opentelemetry-ext-grpc/setup.cfg index 2721a3b257..dd29603247 100644 --- a/ext/opentelemetry-ext-grpc/setup.cfg +++ b/ext/opentelemetry-ext-grpc/setup.cfg @@ -51,3 +51,8 @@ test = [options.packages.find] where = src + +[options.entry_points] +opentelemetry_instrumentor = + grpc_client = opentelemetry.ext.grpc:GrpcInstrumentorClient + grpc_server = opentelemetry.ext.grpc:GrpcInstrumentorServer diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py index 0e9b19ef51..368ae55f2e 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py @@ -12,13 +12,150 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint:disable=import-outside-toplevel -# pylint:disable=import-self # pylint:disable=no-name-in-module # pylint:disable=relative-beyond-top-level +# pylint:disable=import-error +# pylint:disable=no-self-use +""" +Usage Client +------------ +.. code-block:: python + + import logging + + import grpc + + from opentelemetry import trace + from opentelemetry.ext.grpc import GrpcInstrumentorClient, client_interceptor + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, + ) + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) + ) + instrumentor = GrpcInstrumentorClient() + instrumentor.instrument() + + def run(): + with grpc.insecure_channel("localhost:50051") as channel: + + stub = helloworld_pb2_grpc.GreeterStub(channel) + response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU")) + + print("Greeter client received: " + response.message) + + + if __name__ == "__main__": + logging.basicConfig() + run() + +Usage Server +------------ +.. code-block:: python + + import logging + from concurrent import futures + + import grpc + + from opentelemetry import trace + from opentelemetry.ext.grpc import GrpcInstrumentorServer, server_interceptor + from opentelemetry.ext.grpc.grpcext import intercept_server + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, + ) + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) + ) + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + + + class Greeter(helloworld_pb2_grpc.GreeterServicer): + def SayHello(self, request, context): + return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) + + + def serve(): + + server = grpc.server(futures.ThreadPoolExecutor()) + + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port("[::]:50051") + server.start() + server.wait_for_termination() + + + if __name__ == "__main__": + logging.basicConfig() + serve() +""" +from contextlib import contextmanager + +import grpc +from wrapt import wrap_function_wrapper as _wrap from opentelemetry import trace +from opentelemetry.ext.grpc.grpcext import intercept_channel, intercept_server from opentelemetry.ext.grpc.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap + +# pylint:disable=import-outside-toplevel +# pylint:disable=import-self +# pylint:disable=unused-argument +# isort:skip + + +class GrpcInstrumentorServer(BaseInstrumentor): + def _instrument(self, **kwargs): + _wrap("grpc", "server", self.wrapper_fn) + + def _uninstrument(self, **kwargs): + unwrap(grpc, "server") + + def wrapper_fn(self, original_func, instance, args, kwargs): + server = original_func(*args, **kwargs) + return intercept_server(server, server_interceptor()) + + +class GrpcInstrumentorClient(BaseInstrumentor): + def _instrument(self, **kwargs): + if kwargs.get("channel_type") == "secure": + _wrap("grpc", "secure_channel", self.wrapper_fn) + + else: + _wrap("grpc", "insecure_channel", self.wrapper_fn) + + def _uninstrument(self, **kwargs): + if kwargs.get("channel_type") == "secure": + unwrap(grpc, "secure_channel") + + else: + unwrap(grpc, "insecure_channel") + + @contextmanager + def wrapper_fn(self, original_func, instance, args, kwargs): + with original_func(*args, **kwargs) as channel: + yield intercept_channel(channel, client_interceptor()) def client_interceptor(tracer_provider=None): diff --git a/ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py b/ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py index ebe2a8c160..0ba57a4322 100644 --- a/ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py +++ b/ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py @@ -22,7 +22,7 @@ import opentelemetry.ext.grpc from opentelemetry import trace -from opentelemetry.ext.grpc import server_interceptor +from opentelemetry.ext.grpc import GrpcInstrumentorServer, server_interceptor from opentelemetry.ext.grpc.grpcext import intercept_server from opentelemetry.sdk import trace as trace_sdk from opentelemetry.test.test_base import TestBase @@ -49,6 +49,62 @@ def service(self, handler_call_details): class TestOpenTelemetryServerInterceptor(TestBase): + def test_instrumentor(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("test")(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "test") + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + grpc_server_instrumentor.uninstrument() + + def test_uninstrument(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + grpc_server_instrumentor.uninstrument() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("test")(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + def test_create_span(self): """Check that the interceptor wraps calls with spans server-side."""