diff --git a/packages/opentelemetry-instrumentation-grpc/.eslintignore b/packages/opentelemetry-instrumentation-grpc/.eslintignore new file mode 100644 index 0000000000..378eac25d3 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/.eslintignore @@ -0,0 +1 @@ +build diff --git a/packages/opentelemetry-instrumentation-grpc/.eslintrc.js b/packages/opentelemetry-instrumentation-grpc/.eslintrc.js new file mode 100644 index 0000000000..f726f3becb --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + "env": { + "mocha": true, + "node": true + }, + ...require('../../eslint.config.js') +} diff --git a/packages/opentelemetry-instrumentation-grpc/.npmignore b/packages/opentelemetry-instrumentation-grpc/.npmignore new file mode 100644 index 0000000000..9505ba9450 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/.npmignore @@ -0,0 +1,4 @@ +/bin +/coverage +/doc +/test diff --git a/packages/opentelemetry-instrumentation-grpc/LICENSE b/packages/opentelemetry-instrumentation-grpc/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/packages/opentelemetry-instrumentation-grpc/README.md b/packages/opentelemetry-instrumentation-grpc/README.md new file mode 100644 index 0000000000..5f8f30e25d --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/README.md @@ -0,0 +1,75 @@ +# OpenTelemetry gRPC Instrumentation for Node.js + +[![Gitter chat][gitter-image]][gitter-url] +[![NPM Published Version][npm-img]][npm-url] +[![dependencies][dependencies-image]][dependencies-url] +[![devDependencies][devDependencies-image]][devDependencies-url] +[![Apache License][license-image]][license-image] + +This module provides automatic instrumentation for [`grpc`](https://grpc.github.io/grpc/node/). Currently, version [`1.x`](https://www.npmjs.com/package/grpc?activeTab=versions) of the Node.js gRPC library is supported. + +For automatic instrumentation see the +[@opentelemetry/node](https://github.com/open-telemetry/opentelemetry-js/tree/master/packages/opentelemetry-node) package. + +## Installation + +```sh +npm install --save @opentelemetry/instrumentation-grpc +``` + +## Usage + +OpenTelemetry gRPC Instrumentation allows the user to automatically collect trace data and export them to the backend of choice, to give observability to distributed systems when working with [gRPC](https://www.npmjs.com/package/grpc). + +To load a specific instrumentation (**gRPC** in this case), specify it in the Node Tracer's configuration. + +```javascript +const { NodeTracerProvider } = require('@opentelemetry/node'); +const { GrpcInstrumentation } = require('@opentelemetry/instrumentation-grpc'); + +const provider = new NodeTracerProvider({ + // be sure to disable old plugin + plugins: { + grpc: { enabled: false, path: '@opentelemetry/plugin-groc' } + }, +}); + +const grpcInstrumentation = new GrpcInstrumentation({ + // see under for available configuration +}); +grpcInstrumentation.enable(); + +provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter())); +provider.register(); +``` + +See [examples/grpc](https://github.com/open-telemetry/opentelemetry-js/tree/master/examples/grpc) for a short example. + +### gRPC Instrumentation Options + +gRPC instrumentation accepts the following configuration: + +| Options | Type | Description | +| ------- | ---- | ----------- | +| [`ignoreGrpcMethods`](https://github.com/open-telemetry/opentelemetry-js/blob/master/packages/opentelemetry-instrumentation-grpc/src/types.ts#L32) | `IgnoreMatcher[]` | gRPC instrumentation will not trace any methods that match anything in this list. You may pass a string (case-insensitive match), a `RegExp` object, or a filter function. | + +## Useful links + +- For more information on OpenTelemetry, visit: +- For more about OpenTelemetry JavaScript: +- For help or feedback on this project, join us on [gitter][gitter-url] + +## License + +Apache 2.0 - See [LICENSE][license-url] for more information. + +[gitter-image]: https://badges.gitter.im/open-telemetry/opentelemetry-js.svg +[gitter-url]: https://gitter.im/open-telemetry/opentelemetry-node?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge +[license-url]: https://github.com/open-telemetry/opentelemetry-js/blob/master/LICENSE +[license-image]: https://img.shields.io/badge/license-Apache_2.0-green.svg?style=flat +[dependencies-image]: https://david-dm.org/open-telemetry/opentelemetry-js/status.svg?path=packages/opentelemetry-instrumentation-grpc +[dependencies-url]: https://david-dm.org/open-telemetry/opentelemetry-js?path=packages%2Fopentelemetry-instrumentation-grpc +[devDependencies-image]: https://david-dm.org/open-telemetry/opentelemetry-js/dev-status.svg?path=packages/opentelemetry-instrumentation-grpc +[devDependencies-url]: https://david-dm.org/open-telemetry/opentelemetry-js?path=packages%2Fopentelemetry-instrumentation-grpc&type=dev +[npm-url]: https://www.npmjs.com/package/@opentelemetry/instrumentation-grpc +[npm-img]: https://badge.fury.io/js/%40opentelemetry%2Finstrumentation-grpc.svg diff --git a/packages/opentelemetry-instrumentation-grpc/package.json b/packages/opentelemetry-instrumentation-grpc/package.json new file mode 100644 index 0000000000..1be36e2fb7 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/package.json @@ -0,0 +1,74 @@ +{ + "name": "@opentelemetry/instrumentation-grpc", + "version": "0.13.0", + "description": "OpenTelemetry grpc automatic instrumentation package.", + "main": "build/src/index.js", + "types": "build/src/index.d.ts", + "repository": "open-telemetry/opentelemetry-js", + "scripts": { + "compile": "tsc --build", + "clean": "tsc --build --clean", + "test": "nyc ts-mocha -p tsconfig.json test/**/*.test.ts", + "tdd": "npm run test -- --watch-extensions ts --watch", + "lint": "eslint . --ext .ts", + "lint:fix": "eslint . --ext .ts --fix", + "codecov": "nyc report --reporter=json && codecov -f coverage/*.json -p ../../", + "version": "node ../../scripts/version-update.js", + "watch": "tsc --build --watch" + }, + "keywords": [ + "opentelemetry", + "grpc", + "nodejs", + "tracing", + "profiling", + "instrumentation" + ], + "author": "OpenTelemetry Authors", + "license": "Apache-2.0", + "engines": { + "node": ">=8.0.0" + }, + "files": [ + "build/src/**/*.js", + "build/src/**/*.js.map", + "build/src/**/*.d.ts", + "doc", + "LICENSE", + "README.md" + ], + "publishConfig": { + "access": "public" + }, + "devDependencies": { + "@grpc/grpc-js": "^1.2.2", + "@grpc/proto-loader": "^0.5.5", + "@opentelemetry/context-async-hooks": "^0.14.0", + "@opentelemetry/context-base": "^0.14.0", + "@opentelemetry/core": "^0.14.0", + "@opentelemetry/node": "^0.14.0", + "@opentelemetry/tracing": "^0.14.0", + "@types/mocha": "8.0.4", + "@types/node": "14.14.10", + "@types/semver": "7.3.4", + "@types/shimmer": "1.0.1", + "@types/sinon": "9.0.9", + "codecov": "3.8.1", + "grpc": "1.24.4", + "gts": "2.0.2", + "mocha": "7.2.0", + "node-pre-gyp": "0.16.0", + "nyc": "15.1.0", + "rimraf": "3.0.2", + "semver": "7.3.2", + "sinon": "9.2.1", + "ts-mocha": "8.0.0", + "ts-node": "9.0.0", + "typescript": "3.9.7" + }, + "dependencies": { + "@opentelemetry/api": "^0.14.0", + "@opentelemetry/instrumentation": "^0.14.0", + "@opentelemetry/semantic-conventions": "^0.14.0" + } +} diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc.ts new file mode 100644 index 0000000000..f2ab0f9ada --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc.ts @@ -0,0 +1,581 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + StatusCode, + context, + propagation, + Span, + SpanKind, + SpanOptions, + Status, + setSpan, +} from '@opentelemetry/api'; +import { RpcAttribute } from '@opentelemetry/semantic-conventions'; +import { + InstrumentationBase, + InstrumentationConfig, + InstrumentationNodeModuleDefinition, + InstrumentationNodeModuleFile, + isWrapped, +} from '@opentelemetry/instrumentation'; +import type * as events from 'events'; +import type * as grpcTypes from 'grpc'; +import { + GrpcClientFunc, + GrpcInternalClientTypes, + GrpcInstrumentationConfig, + SendUnaryDataCallback, + ServerCallWithMeta, +} from './types'; +import { + findIndex, + _grpcStatusCodeToOpenTelemetryStatusCode, + _grpcStatusCodeToSpanStatus, + _methodIsIgnored, +} from './utils'; +import { VERSION } from './version'; + +/** The metadata key under which span context is stored as a binary value. */ +export const GRPC_TRACE_KEY = 'grpc-trace-bin'; + +/** + * Holding reference to grpc module here to access constant of grpc modules + * instead of just requiring it avoid directly depending on grpc itself. + */ +let grpcClient: typeof grpcTypes; + +export class GrpcInstrumentation extends InstrumentationBase { + constructor( + protected _config: GrpcInstrumentationConfig & InstrumentationConfig = {} + ) { + super('@opentelemetry/instrumentation-grpc', VERSION, _config); + } + + public setConfig( + config: GrpcInstrumentationConfig & InstrumentationConfig = {} + ) { + this._config = Object.assign({}, config); + } + + init() { + return [ + new InstrumentationNodeModuleDefinition( + 'grpc', + ['1.*'], + (moduleExports, version) => { + this._logger.debug(`Applying patch for grpc@${version}`); + grpcClient = moduleExports; + if (isWrapped(moduleExports.Server.prototype.register)) { + this._unwrap(moduleExports.Server.prototype, 'register'); + } + this._wrap( + moduleExports.Server.prototype, + 'register', + this._patchServer() as any + ); + // Wrap the externally exported client constructor + if (isWrapped(moduleExports.makeGenericClientConstructor)) { + this._unwrap(moduleExports, 'makeGenericClientConstructor'); + } + this._wrap( + moduleExports, + 'makeGenericClientConstructor', + this._patchClient() + ); + return moduleExports; + }, + (moduleExports, version) => { + if (moduleExports === undefined) return; + this._logger.debug(`Removing patch for grpc@${version}`); + + this._unwrap(moduleExports.Server.prototype, 'register'); + }, + this._getInternalPatchs() + ), + ]; + } + + private _getInternalPatchs() { + const onPatch = ( + moduleExports: GrpcInternalClientTypes, + version?: string + ) => { + this._logger.debug(`Applying internal patch for grpc@${version}`); + if (isWrapped(moduleExports.makeClientConstructor)) { + this._unwrap(moduleExports, 'makeClientConstructor'); + } + this._wrap(moduleExports, 'makeClientConstructor', this._patchClient()); + return moduleExports; + }; + const onUnPatch = ( + moduleExports?: GrpcInternalClientTypes, + version?: string + ) => { + if (moduleExports === undefined) return; + this._logger.debug(`Removing internal patch for grpc@${version}`); + this._unwrap(moduleExports, 'makeClientConstructor'); + }; + return [ + new InstrumentationNodeModuleFile( + 'grpc/src/node/src/client.js', + ['0.13 - 1.6'], + onPatch, + onUnPatch + ), + new InstrumentationNodeModuleFile( + 'grpc/src/client.js', + ['^1.7'], + onPatch, + onUnPatch + ), + ]; + } + + private _setSpanContext(metadata: grpcTypes.Metadata): void { + propagation.inject(context.active(), metadata, { + set: (metadata, k, v) => metadata.set(k, v as grpcTypes.MetadataValue), + }); + } + + private _patchServer() { + return (originalRegister: typeof grpcTypes.Server.prototype.register) => { + const plugin = this; + plugin._logger.debug('patched gRPC server'); + + return function register( + this: grpcTypes.Server & { handlers: any }, + name: string, + handler: grpcTypes.handleCall, + serialize: grpcTypes.serialize, + deserialize: grpcTypes.deserialize, + type: string + ) { + const originalResult = originalRegister.apply(this, arguments as any); + const handlerSet = this.handlers[name]; + + plugin._wrap( + handlerSet, + 'func', + (originalFunc: grpcTypes.handleCall) => { + return function func( + this: typeof handlerSet, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback + ) { + const self = this; + if (plugin._shouldNotTraceServerCall(call, name)) { + switch (type) { + case 'unary': + case 'client_stream': + return (originalFunc as Function).call( + self, + call, + callback + ); + case 'server_stream': + case 'bidi': + return (originalFunc as Function).call(self, call); + default: + return originalResult; + } + } + const spanName = `grpc.${name.replace('/', '')}`; + const spanOptions: SpanOptions = { + kind: SpanKind.SERVER, + }; + + plugin._logger.debug( + 'patch func: %s', + JSON.stringify(spanOptions) + ); + + context.with( + propagation.extract(context.active(), call.metadata, { + get: (metadata, key) => metadata.get(key).map(String), + keys: metadata => Object.keys(metadata.getMap()), + }), + () => { + const span = plugin.tracer + .startSpan(spanName, spanOptions) + .setAttributes({ + [RpcAttribute.GRPC_KIND]: spanOptions.kind, + }); + + context.with(setSpan(context.active(), span), () => { + switch (type) { + case 'unary': + case 'client_stream': + return plugin._clientStreamAndUnaryHandler( + plugin, + span, + call, + callback, + originalFunc, + self + ); + case 'server_stream': + case 'bidi': + return plugin._serverStreamAndBidiHandler( + plugin, + span, + call, + originalFunc, + self + ); + default: + break; + } + }); + } + ); + }; + } + ); + + return originalResult; + }; + }; + } + + /** + * Returns true if the server call should not be traced. + */ + private _shouldNotTraceServerCall( + call: ServerCallWithMeta, + name: string + ): boolean { + const parsedName = name.split('/'); + return _methodIsIgnored( + parsedName[parsedName.length - 1] || name, + this._config.ignoreGrpcMethods + ); + } + + private _clientStreamAndUnaryHandler( + plugin: GrpcInstrumentation, + span: Span, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback, + original: + | grpcTypes.handleCall + | grpcTypes.ClientReadableStream, + self: {} + ) { + function patchedCallback( + err: grpcTypes.ServiceError, + value: any, + trailer: grpcTypes.Metadata, + flags: grpcTypes.writeFlags + ) { + if (err) { + if (err.code) { + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttribute(RpcAttribute.GRPC_STATUS_CODE, err.code.toString()); + } + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + } else { + span.setStatus({ code: StatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + grpcClient.status.OK.toString() + ); + } + span.addEvent('received'); + + // end the span + span.end(); + return callback(err, value, trailer, flags); + } + + context.bind(call); + return (original as Function).call(self, call, patchedCallback); + } + + private _serverStreamAndBidiHandler( + plugin: GrpcInstrumentation, + span: Span, + call: ServerCallWithMeta, + original: grpcTypes.handleCall, + self: {} + ) { + let spanEnded = false; + const endSpan = () => { + if (!spanEnded) { + spanEnded = true; + span.end(); + } + }; + + context.bind(call); + call.on('finish', () => { + span.setStatus(_grpcStatusCodeToSpanStatus(call.status.code)); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + call.status.code.toString() + ); + + // if there is an error, span will be ended on error event, otherwise end it here + if (call.status.code === 0) { + span.addEvent('finished'); + endSpan(); + } + }); + + call.on('error', (err: grpcTypes.ServiceError) => { + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.addEvent('finished with error'); + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + endSpan(); + }); + + return (original as any).call(self, call); + } + + private _patchClient() { + const plugin = this; + return (original: typeof grpcTypes.makeGenericClientConstructor): never => { + plugin._logger.debug('patching client'); + return function makeClientConstructor( + this: typeof grpcTypes.Client, + methods: { [key: string]: { originalName?: string } }, + _serviceName: string, + _options: grpcTypes.GenericClientOptions + ) { + const client = original.apply(this, arguments as any); + plugin._massWrap( + client.prototype as never, + plugin._getMethodsToWrap(client, methods) as never[], + plugin._getPatchedClientMethods() as any + ); + return client; + } as never; + }; + } + + private _getMethodsToWrap( + client: typeof grpcTypes.Client, + methods: { [key: string]: { originalName?: string } } + ): string[] { + const methodList: string[] = []; + + // For a method defined in .proto as "UnaryMethod" + Object.entries(methods).forEach(([name, { originalName }]) => { + if (!_methodIsIgnored(name, this._config.ignoreGrpcMethods)) { + methodList.push(name); // adds camel case method name: "unaryMethod" + if ( + originalName && + // eslint-disable-next-line no-prototype-builtins + client.prototype.hasOwnProperty(originalName) && + name !== originalName // do not add duplicates + ) { + // adds original method name: "UnaryMethod", + methodList.push(originalName); + } + } + }); + return methodList; + } + + private _getPatchedClientMethods() { + const plugin = this; + return (original: GrpcClientFunc) => { + plugin._logger.debug('patch all client methods'); + return function clientMethodTrace(this: grpcTypes.Client) { + const name = `grpc.${original.path.replace('/', '')}`; + const args = Array.prototype.slice.call(arguments); + const metadata = plugin._getMetadata(original, args); + const span = plugin.tracer.startSpan(name, { + kind: SpanKind.CLIENT, + }); + return context.with(setSpan(context.active(), span), () => + plugin._makeGrpcClientRemoteCall(original, args, metadata, this)(span) + ); + }; + }; + } + + /** + * This method handles the client remote call + */ + private _makeGrpcClientRemoteCall( + original: GrpcClientFunc, + args: any[], + metadata: grpcTypes.Metadata, + self: grpcTypes.Client + ) { + /** + * Patches a callback so that the current span for this trace is also ended + * when the callback is invoked. + */ + function patchedCallback( + span: Span, + callback: SendUnaryDataCallback, + _metadata: grpcTypes.Metadata + ) { + const wrappedFn = (err: grpcTypes.ServiceError, res: any) => { + if (err) { + if (err.code) { + span.setStatus(_grpcStatusCodeToSpanStatus(err.code)); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + err.code.toString() + ); + } + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + } else { + span.setStatus({ code: StatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + grpcClient.status.OK.toString() + ); + } + + span.end(); + callback(err, res); + }; + return context.bind(wrappedFn); + } + + return (span: Span) => { + if (!span) { + return original.apply(self, args); + } + + // if unary or clientStream + if (!original.responseStream) { + const callbackFuncIndex = findIndex(args, arg => { + return typeof arg === 'function'; + }); + if (callbackFuncIndex !== -1) { + args[callbackFuncIndex] = patchedCallback( + span, + args[callbackFuncIndex], + metadata + ); + } + } + + span.addEvent('sent'); + span.setAttributes({ + [RpcAttribute.GRPC_METHOD]: original.path, + [RpcAttribute.GRPC_KIND]: SpanKind.CLIENT, + }); + + this._setSpanContext(metadata); + const call = original.apply(self, args); + + // if server stream or bidi + if (original.responseStream) { + // Both error and status events can be emitted + // the first one emitted set spanEnded to true + let spanEnded = false; + const endSpan = () => { + if (!spanEnded) { + span.end(); + spanEnded = true; + } + }; + context.bind(call); + ((call as unknown) as events.EventEmitter).on( + 'error', + (err: grpcTypes.ServiceError) => { + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + endSpan(); + } + ); + + ((call as unknown) as events.EventEmitter).on( + 'status', + (status: Status) => { + span.setStatus({ code: StatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + status.code.toString() + ); + endSpan(); + } + ); + } + return call; + }; + } + + private _getMetadata( + original: GrpcClientFunc, + args: any[] + ): grpcTypes.Metadata { + let metadata: grpcTypes.Metadata; + + // This finds an instance of Metadata among the arguments. + // A possible issue that could occur is if the 'options' parameter from + // the user contains an '_internal_repr' as well as a 'getMap' function, + // but this is an extremely rare case. + let metadataIndex = findIndex(args, (arg: any) => { + return ( + arg && + typeof arg === 'object' && + arg._internal_repr && + typeof arg.getMap === 'function' + ); + }); + if (metadataIndex === -1) { + metadata = new grpcClient.Metadata(); + if (!original.requestStream) { + // unary or server stream + if (args.length === 0) { + // No argument (for the gRPC call) was provided, so we will have to + // provide one, since metadata cannot be the first argument. + // The internal representation of argument defaults to undefined + // in its non-presence. + // Note that we can't pass null instead of undefined because the + // serializer within gRPC doesn't accept it. + args.push(undefined); + } + metadataIndex = 1; + } else { + // client stream or bidi + metadataIndex = 0; + } + args.splice(metadataIndex, 0, metadata); + } else { + metadata = args[metadataIndex]; + } + return metadata; + } +} diff --git a/packages/opentelemetry-instrumentation-grpc/src/index.ts b/packages/opentelemetry-instrumentation-grpc/src/index.ts new file mode 100644 index 0000000000..4ffcf69671 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/index.ts @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export * from './grpc'; diff --git a/packages/opentelemetry-instrumentation-grpc/src/types.ts b/packages/opentelemetry-instrumentation-grpc/src/types.ts new file mode 100644 index 0000000000..14b605d263 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/types.ts @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as grpcTypes from 'grpc'; +import * as events from 'events'; +import { InstrumentationConfig } from '@opentelemetry/instrumentation'; + +export type IgnoreMatcher = string | RegExp | ((str: string) => boolean); + +export type SendUnaryDataCallback = ( + error: grpcTypes.ServiceError | null, + value?: any, + trailer?: grpcTypes.Metadata, + flags?: grpcTypes.writeFlags +) => void; + +export interface GrpcInstrumentationConfig extends InstrumentationConfig { + /* Omits tracing on any gRPC methods that match any of + * the IgnoreMatchers in the ignoreGrpcMethods list + */ + ignoreGrpcMethods?: IgnoreMatcher[]; +} + +interface GrpcStatus { + code: number; + details: string; + metadata: grpcTypes.Metadata; +} + +export type ServerCall = + | typeof grpcTypes.ServerUnaryCall + | typeof grpcTypes.ServerReadableStream + | typeof grpcTypes.ServerWritableStream + | typeof grpcTypes.ServerDuplexStream; + +export type ServerCallWithMeta = ServerCall & { + metadata: grpcTypes.Metadata; + status: GrpcStatus; + request?: unknown; +} & events.EventEmitter; + +export type GrpcClientFunc = typeof Function & { + path: string; + requestStream: boolean; + responseStream: boolean; +}; + +export type GrpcInternalClientTypes = { + makeClientConstructor: typeof grpcTypes.makeGenericClientConstructor; +}; diff --git a/packages/opentelemetry-instrumentation-grpc/src/utils.ts b/packages/opentelemetry-instrumentation-grpc/src/utils.ts new file mode 100644 index 0000000000..e81e5424f8 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/utils.ts @@ -0,0 +1,96 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { StatusCode, Status } from '@opentelemetry/api'; +import type * as grpcTypes from 'grpc'; +import { IgnoreMatcher } from './types'; + +// Equivalent to lodash _.findIndex +export const findIndex: (args: T[], fn: (arg: T) => boolean) => number = ( + args, + fn: Function +) => { + let index = -1; + for (const arg of args) { + index++; + if (fn(arg)) { + return index; + } + } + return -1; +}; + +/** + * Convert a grpc status code to an opentelemetry Status code. + * @param status + */ +export const _grpcStatusCodeToOpenTelemetryStatusCode = ( + status?: grpcTypes.status +): StatusCode => { + if (status !== undefined && status === 0) { + return StatusCode.UNSET; + } + return StatusCode.ERROR; +}; + +export const _grpcStatusCodeToSpanStatus = (status: number): Status => { + return { code: _grpcStatusCodeToOpenTelemetryStatusCode(status) }; +}; + +/** + * Returns true if methodName matches pattern + * @param methodName the name of the method + * @param pattern Match pattern + */ +const _satisfiesPattern = ( + methodName: string, + pattern: IgnoreMatcher +): boolean => { + if (typeof pattern === 'string') { + return pattern.toLowerCase() === methodName.toLowerCase(); + } else if (pattern instanceof RegExp) { + return pattern.test(methodName); + } else if (typeof pattern === 'function') { + return pattern(methodName); + } else { + return false; + } +}; + +/** + * Returns true if the current plugin configuration + * ignores the given method. + * @param methodName the name of the method + * @param ignoredMethods a list of matching patterns + * @param onException an error handler for matching exceptions + */ +export const _methodIsIgnored = ( + methodName: string, + ignoredMethods?: IgnoreMatcher[] +): boolean => { + if (!ignoredMethods) { + // No ignored gRPC methods + return false; + } + + for (const pattern of ignoredMethods) { + if (_satisfiesPattern(methodName, pattern)) { + return true; + } + } + + return false; +}; diff --git a/packages/opentelemetry-instrumentation-grpc/src/version.ts b/packages/opentelemetry-instrumentation-grpc/src/version.ts new file mode 100644 index 0000000000..bc552fd543 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/version.ts @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// this is autogenerated file, see scripts/version-update.js +export const VERSION = '0.14.0'; diff --git a/packages/opentelemetry-instrumentation-grpc/test/fixtures/grpc-test.proto b/packages/opentelemetry-instrumentation-grpc/test/fixtures/grpc-test.proto new file mode 100644 index 0000000000..4949dd5e0d --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/test/fixtures/grpc-test.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package pkg_test; + +service GrpcTester { + rpc UnaryMethod (TestRequest) returns (TestReply) {} + rpc camelCaseMethod (TestRequest) returns (TestReply) {} + rpc ClientStreamMethod (stream TestRequest) returns (TestReply) {} + rpc ServerStreamMethod (TestRequest) returns (stream TestReply) {} + rpc BidiStreamMethod (stream TestRequest) returns (stream TestReply) {} +} + +message TestRequest { + int32 num = 1; +} + +message TestReply { + int32 num = 1; +} diff --git a/packages/opentelemetry-instrumentation-grpc/test/grpc.test.ts b/packages/opentelemetry-instrumentation-grpc/test/grpc.test.ts new file mode 100644 index 0000000000..1636c7f4b8 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/test/grpc.test.ts @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { runTests } from './helper'; +import { GrpcInstrumentation } from '../src/grpc'; + +const instrumentation = new GrpcInstrumentation(); +instrumentation.enable(); +instrumentation.disable(); + +import * as grpc from 'grpc'; + +describe('#grpc', () => { + runTests(instrumentation, 'grpc', grpc, 12345); +}); diff --git a/packages/opentelemetry-instrumentation-grpc/test/helper.ts b/packages/opentelemetry-instrumentation-grpc/test/helper.ts new file mode 100644 index 0000000000..768aeeeeb7 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/test/helper.ts @@ -0,0 +1,812 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + context, + SpanKind, + propagation, + NoopLogger, + setSpan, + getSpan, +} from '@opentelemetry/api'; +import { HttpTraceContext } from '@opentelemetry/core'; +import { NodeTracerProvider } from '@opentelemetry/node'; +import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; +import { ContextManager } from '@opentelemetry/context-base'; +import { + InMemorySpanExporter, + SimpleSpanProcessor, +} from '@opentelemetry/tracing'; +import * as assert from 'assert'; +import * as protoLoader from '@grpc/proto-loader'; +import type * as grpcNapi from 'grpc'; +import type * as grpcJs from '@grpc/grpc-js'; +import { assertPropagation, assertSpan } from './utils/assertionUtils'; +import { promisify } from 'util'; +import type { GrpcInstrumentation } from '../src'; +import * as path from 'path'; + +const PROTO_PATH = path.resolve(__dirname, './fixtures/grpc-test.proto'); +const memoryExporter = new InMemorySpanExporter(); + +const options = { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, +}; + +interface TestRequestResponse { + num: number; +} + +type ServiceError = grpcNapi.ServiceError | grpcJs.ServiceError; +type Client = grpcNapi.Client | grpcJs.Client; +type Server = grpcNapi.Server | grpcJs.Server; +type ServerUnaryCall = + | grpcNapi.ServerUnaryCall + | grpcJs.ServerUnaryCall; +type RequestCallback = grpcJs.requestCallback; +type ServerReadableStream = + | grpcNapi.ServerReadableStream + | grpcJs.ServerReadableStream; +type ServerWriteableStream = + | grpcNapi.ServerWriteableStream + | grpcJs.ServerWritableStream; +type ServerDuplexStream = + | grpcNapi.ServerDuplexStream + | grpcJs.ServerDuplexStream; +type Metadata = grpcNapi.Metadata | grpcJs.Metadata; + +type TestGrpcClient = (typeof grpcJs | typeof grpcNapi)['Client'] & { + unaryMethod: any; + UnaryMethod: any; + camelCaseMethod: any; + clientStreamMethod: any; + serverStreamMethod: any; + bidiStreamMethod: any; +}; + +interface TestGrpcCall { + description: string; + methodName: string; + method: Function; + request: TestRequestResponse | TestRequestResponse[]; + result: TestRequestResponse | TestRequestResponse[]; + metadata?: Metadata; +} + +// Compare two arrays using an equal function f +const arrayIsEqual = (f: any) => ([x, ...xs]: any) => ([y, ...ys]: any): any => + x === undefined && y === undefined + ? true + : Boolean(f(x)(y)) && arrayIsEqual(f)(xs)(ys); + +// Return true if two requests has the same num value +const requestEqual = (x: TestRequestResponse) => (y: TestRequestResponse) => + x.num !== undefined && x.num === y.num; + +// Check if its equal requests or array of requests +const checkEqual = (x: TestRequestResponse | TestRequestResponse[]) => ( + y: TestRequestResponse | TestRequestResponse[] +) => + x instanceof Array && y instanceof Array + ? arrayIsEqual(requestEqual)(x as any)(y as any) + : !(x instanceof Array) && !(y instanceof Array) + ? requestEqual(x)(y) + : false; + +export const runTests = ( + plugin: GrpcInstrumentation, + moduleName: string, + grpc: typeof grpcNapi | typeof grpcJs, + grpcPort: number +) => { + const MAX_ERROR_STATUS = grpc.status.UNAUTHENTICATED; + + const grpcClient = { + unaryMethod: ( + client: TestGrpcClient, + request: TestRequestResponse, + metadata: Metadata = new grpc.Metadata() + ): Promise => { + return new Promise((resolve, reject) => { + return client.unaryMethod( + request, + metadata, + (err: ServiceError, response: TestRequestResponse) => { + if (err) { + reject(err); + } else { + resolve(response); + } + } + ); + }); + }, + + UnaryMethod: ( + client: TestGrpcClient, + request: TestRequestResponse, + metadata: Metadata = new grpc.Metadata() + ): Promise => { + return new Promise((resolve, reject) => { + return client.UnaryMethod( + request, + metadata, + (err: ServiceError, response: TestRequestResponse) => { + if (err) { + reject(err); + } else { + resolve(response); + } + } + ); + }); + }, + + camelCaseMethod: ( + client: TestGrpcClient, + request: TestRequestResponse, + metadata: Metadata = new grpc.Metadata() + ): Promise => { + return new Promise((resolve, reject) => { + return client.camelCaseMethod( + request, + metadata, + (err: ServiceError, response: TestRequestResponse) => { + if (err) { + reject(err); + } else { + resolve(response); + } + } + ); + }); + }, + + clientStreamMethod: ( + client: TestGrpcClient, + request: TestRequestResponse[], + metadata: Metadata = new grpc.Metadata() + ): Promise => { + return new Promise((resolve, reject) => { + const writeStream = client.clientStreamMethod( + metadata, + (err: ServiceError, response: TestRequestResponse) => { + if (err) { + reject(err); + } else { + resolve(response); + } + } + ); + + request.forEach(element => { + writeStream.write(element); + }); + writeStream.end(); + }); + }, + + serverStreamMethod: ( + client: TestGrpcClient, + request: TestRequestResponse, + metadata: Metadata = new grpc.Metadata() + ): Promise => { + return new Promise((resolve, reject) => { + const result: TestRequestResponse[] = []; + const readStream = client.serverStreamMethod(request, metadata); + + readStream.on('data', (data: TestRequestResponse) => { + result.push(data); + }); + readStream.on('error', (err: ServiceError) => { + reject(err); + }); + readStream.on('end', () => { + resolve(result); + }); + }); + }, + + bidiStreamMethod: ( + client: TestGrpcClient, + request: TestRequestResponse[], + metadata: Metadata = new grpc.Metadata() + ): Promise => { + return new Promise((resolve, reject) => { + const result: TestRequestResponse[] = []; + const bidiStream = client.bidiStreamMethod(metadata); + + bidiStream.on('data', (data: TestRequestResponse) => { + result.push(data); + }); + + request.forEach(element => { + bidiStream.write(element); + }); + + bidiStream.on('error', (err: ServiceError) => { + reject(err); + }); + + bidiStream.on('end', () => { + resolve(result); + }); + + bidiStream.end(); + }); + }, + }; + + let server: Server; + let client: Client; + + const replicate = (request: TestRequestResponse) => { + const result: TestRequestResponse[] = []; + for (let i = 0; i < request.num; i++) { + result.push(request); + } + return result; + }; + + async function startServer( + grpc: typeof grpcJs | typeof grpcNapi, + proto: any + ) { + const server = new grpc.Server(); + + function getError(msg: string, code: number): ServiceError | null { + const err: ServiceError = { + ...new Error(msg), + name: msg, + message: msg, + code, + details: msg, + }; + return err; + } + + server.addService(proto.GrpcTester.service, { + // An error is emitted every time + // request.num <= MAX_ERROR_STATUS = (status.UNAUTHENTICATED) + // in those cases, erro.code = request.num + + // This method returns the request + unaryMethod(call: ServerUnaryCall, callback: RequestCallback) { + call.request.num <= MAX_ERROR_STATUS + ? callback( + getError( + 'Unary Method Error', + call.request.num + ) as grpcJs.ServiceError + ) + : callback(null, { num: call.request.num }); + }, + + // This method returns the request + camelCaseMethod(call: ServerUnaryCall, callback: RequestCallback) { + call.request.num <= MAX_ERROR_STATUS + ? callback( + getError( + 'Unary Method Error', + call.request.num + ) as grpcJs.ServiceError + ) + : callback(null, { num: call.request.num }); + }, + + // This method sums the requests + clientStreamMethod( + call: ServerReadableStream, + callback: RequestCallback + ) { + let sum = 0; + let hasError = false; + let code = grpc.status.OK; + call.on('data', (data: TestRequestResponse) => { + sum += data.num; + if (data.num <= MAX_ERROR_STATUS) { + hasError = true; + code = data.num; + } + }); + call.on('end', () => { + hasError + ? callback(getError('Client Stream Method Error', code) as any) + : callback(null, { num: sum }); + }); + }, + + // This method returns an array that replicates the request, request.num of + // times + serverStreamMethod: (call: ServerWriteableStream) => { + const result = replicate(call.request); + + if (call.request.num <= MAX_ERROR_STATUS) { + call.emit( + 'error', + getError('Server Stream Method Error', call.request.num) + ); + } else { + result.forEach(element => { + call.write(element); + }); + } + call.end(); + }, + + // This method returns the request + bidiStreamMethod: (call: ServerDuplexStream) => { + call.on('data', (data: TestRequestResponse) => { + if (data.num <= MAX_ERROR_STATUS) { + call.emit( + 'error', + getError('Server Stream Method Error', data.num) + ); + } else { + call.write(data); + } + }); + call.on('end', () => { + call.end(); + }); + }, + }); + const bindAwait = promisify(server.bindAsync); + await bindAwait.call( + server, + 'localhost:' + grpcPort, + grpc.ServerCredentials.createInsecure() as grpcJs.ServerCredentials + ); + server.start(); + return server; + } + + function createClient(grpc: typeof grpcJs | typeof grpcNapi, proto: any) { + return new proto.GrpcTester( + 'localhost:' + grpcPort, + grpc.credentials.createInsecure() + ); + } + + return describe('GrpcPlugin', () => { + let contextManager: ContextManager; + + before(() => { + propagation.setGlobalPropagator(new HttpTraceContext()); + }); + + beforeEach(() => { + contextManager = new AsyncHooksContextManager().enable(); + context.setGlobalContextManager(contextManager); + }); + + afterEach(() => { + context.disable(); + }); + + it('moduleName should be grpc', () => { + assert.deepStrictEqual( + '@opentelemetry/instrumentation-grpc', + plugin.instrumentationName + ); + }); + + const requestList: TestRequestResponse[] = [{ num: 100 }, { num: 50 }]; + const resultSum = { + num: requestList.reduce((sum, x) => { + return sum + x.num; + }, 0), + }; + const methodList: TestGrpcCall[] = [ + { + description: 'unary call', + methodName: 'UnaryMethod', + method: grpcClient.unaryMethod, + request: requestList[0], + result: requestList[0], + }, + { + description: 'Unary call', + methodName: 'UnaryMethod', + method: grpcClient.UnaryMethod, + request: requestList[0], + result: requestList[0], + }, + { + description: 'camelCase unary call', + methodName: 'camelCaseMethod', + method: grpcClient.camelCaseMethod, + request: requestList[0], + result: requestList[0], + }, + { + description: 'clientStream call', + methodName: 'ClientStreamMethod', + method: grpcClient.clientStreamMethod, + request: requestList, + result: resultSum, + }, + { + description: 'serverStream call', + methodName: 'ServerStreamMethod', + method: grpcClient.serverStreamMethod, + request: resultSum, + result: replicate(resultSum), + }, + { + description: 'bidiStream call', + methodName: 'BidiStreamMethod', + method: grpcClient.bidiStreamMethod, + request: requestList, + result: requestList, + }, + ]; + + const runTest = ( + method: typeof methodList[0], + provider: NodeTracerProvider, + checkSpans = true + ) => { + it(`should ${ + checkSpans ? 'do' : 'not' + }: create a rootSpan for client and a childSpan for server - ${ + method.description + }`, async () => { + const args = [client, method.request, method.metadata]; + await (method.method as any) + .apply({}, args) + .then((result: TestRequestResponse | TestRequestResponse[]) => { + assert.ok( + checkEqual(result)(method.result), + 'gRPC call returns correct values' + ); + const spans = memoryExporter.getFinishedSpans(); + if (checkSpans) { + const incomingSpan = spans[0]; + const outgoingSpan = spans[1]; + const validations = { + name: `grpc.pkg_test.GrpcTester/${method.methodName}`, + status: grpc.status.OK, + }; + + assert.strictEqual(spans.length, 2); + assertSpan( + moduleName, + incomingSpan, + SpanKind.SERVER, + validations + ); + assertSpan( + moduleName, + outgoingSpan, + SpanKind.CLIENT, + validations + ); + assertPropagation(incomingSpan, outgoingSpan); + } else { + assert.strictEqual(spans.length, 0); + } + }); + }); + + it(`should raise an error for client childSpan/server rootSpan - ${method.description} - status = OK`, () => { + const expectEmpty = memoryExporter.getFinishedSpans(); + assert.strictEqual(expectEmpty.length, 0); + + const span = provider + .getTracer('default') + .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); + return context.with(setSpan(context.active(), span), async () => { + const rootSpan = getSpan(context.active()); + if (!rootSpan) { + return assert.ok(false); + } + assert.deepStrictEqual(rootSpan, span); + + const args = [client, method.request, method.metadata]; + await (method.method as any) + .apply({}, args) + .then(() => { + // Assert + if (checkSpans) { + const spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2); + const serverSpan = spans[0]; + const clientSpan = spans[1]; + const validations = { + name: `grpc.pkg_test.GrpcTester/${method.methodName}`, + status: grpc.status.OK, + }; + assertSpan( + moduleName, + serverSpan, + SpanKind.SERVER, + validations + ); + assertSpan( + moduleName, + clientSpan, + SpanKind.CLIENT, + validations + ); + assertPropagation(serverSpan, clientSpan); + assert.strictEqual( + rootSpan.context().traceId, + serverSpan.spanContext.traceId + ); + assert.strictEqual( + rootSpan.context().spanId, + clientSpan.parentSpanId + ); + } + }) + .catch((err: ServiceError) => { + assert.ok(false, err); + }); + }); + }); + }; + + const insertError = ( + request: TestRequestResponse | TestRequestResponse[] + ) => (code: number) => + request instanceof Array ? [{ num: code }, ...request] : { num: code }; + + const runErrorTest = ( + method: typeof methodList[0], + key: string, + errorCode: number, + provider: NodeTracerProvider + ) => { + it(`should raise an error for client/server rootSpans: method=${method.methodName}, status=${key}`, async () => { + const expectEmpty = memoryExporter.getFinishedSpans(); + assert.strictEqual(expectEmpty.length, 0); + + const args = [client, insertError(method.request)(errorCode)]; + + await (method.method as any) + .apply({}, args) + .then(() => { + assert.ok(false); + }) + .catch((err: ServiceError) => { + const spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Expect 2 ended spans'); + + const validations = { + name: `grpc.pkg_test.GrpcTester/${method.methodName}`, + status: errorCode, + }; + const serverRoot = spans[0]; + const clientRoot = spans[1]; + assertSpan(moduleName, serverRoot, SpanKind.SERVER, validations); + assertSpan(moduleName, clientRoot, SpanKind.CLIENT, validations); + assertPropagation(serverRoot, clientRoot); + }); + }); + + it(`should raise an error for client childSpan/server rootSpan - ${method.description} - status = ${key}`, () => { + const expectEmpty = memoryExporter.getFinishedSpans(); + assert.strictEqual(expectEmpty.length, 0); + + const span = provider + .getTracer('default') + .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); + return context.with(setSpan(context.active(), span), async () => { + const rootSpan = getSpan(context.active()); + if (!rootSpan) { + return assert.ok(false); + } + assert.deepStrictEqual(rootSpan, span); + + const args = [client, insertError(method.request)(errorCode)]; + + await (method.method as any) + .apply({}, args) + .then(() => { + assert.ok(false); + }) + .catch((err: ServiceError) => { + // Assert + const spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2); + const serverSpan = spans[0]; + const clientSpan = spans[1]; + const validations = { + name: `grpc.pkg_test.GrpcTester/${method.methodName}`, + status: errorCode, + }; + assertSpan(moduleName, serverSpan, SpanKind.SERVER, validations); + assertSpan(moduleName, clientSpan, SpanKind.CLIENT, validations); + assertPropagation(serverSpan, clientSpan); + assert.strictEqual( + rootSpan.context().traceId, + serverSpan.spanContext.traceId + ); + assert.strictEqual( + rootSpan.context().spanId, + clientSpan.parentSpanId + ); + }); + }); + }); + }; + + describe('enable()', () => { + const logger = new NoopLogger(); + const provider = new NodeTracerProvider({ logger }); + provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); + beforeEach(() => { + memoryExporter.reset(); + }); + + before(async () => { + plugin.setTracerProvider(provider); + plugin.enable(); + + const packageDefinition = await protoLoader.load(PROTO_PATH, options); + const proto = grpc.loadPackageDefinition(packageDefinition).pkg_test; + + server = await startServer(grpc, proto); + client = createClient(grpc, proto); + }); + + after(done => { + client.close(); + server.tryShutdown(() => { + plugin.disable(); + done(); + }); + }); + + methodList.forEach(method => { + describe(`Test automatic tracing for grpc remote method ${method.description}`, () => { + runTest(method, provider); + }); + }); + + methodList.forEach(method => { + describe(`Test error raising for grpc remote ${method.description}`, () => { + Object.keys(grpc.status).forEach((statusKey: string) => { + const errorCode = Number(grpc.status[statusKey as any]); + if (errorCode > grpc.status.OK) { + runErrorTest(method, statusKey, errorCode, provider); + } + }); + }); + }); + }); + + describe('disable()', () => { + const logger = new NoopLogger(); + const provider = new NodeTracerProvider({ logger }); + provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); + beforeEach(() => { + memoryExporter.reset(); + }); + + before(async () => { + plugin.disable(); + + const packageDefinition = await protoLoader.load(PROTO_PATH, options); + const proto = grpc.loadPackageDefinition(packageDefinition).pkg_test; + + server = await startServer(grpc, proto); + client = createClient(grpc, proto); + }); + + after(done => { + client.close(); + server.tryShutdown(() => { + done(); + }); + }); + + methodList.map(method => { + describe(`Test automatic tracing for grpc remote method ${method.description}`, () => { + runTest(method, provider, false); + }); + }); + }); + + describe('Test filtering requests using metadata', () => { + const logger = new NoopLogger(); + const provider = new NodeTracerProvider({ logger }); + provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); + beforeEach(() => { + memoryExporter.reset(); + }); + + before(async () => { + plugin.disable(); + plugin.setTracerProvider(provider); + plugin.setConfig({}); + plugin.enable(); + + const packageDefinition = await protoLoader.load(PROTO_PATH, options); + const proto = grpc.loadPackageDefinition(packageDefinition).pkg_test; + + server = await startServer(grpc, proto); + client = createClient(grpc, proto); + }); + + after(done => { + client.close(); + server.tryShutdown(() => { + plugin.disable(); + done(); + }); + }); + }); + + describe('Test filtering requests using options', () => { + const logger = new NoopLogger(); + const provider = new NodeTracerProvider({ logger }); + const checkSpans: { [key: string]: boolean } = { + unaryMethod: false, + UnaryMethod: false, + camelCaseMethod: false, + ClientStreamMethod: true, + ServerStreamMethod: true, + BidiStreamMethod: false, + }; + provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); + beforeEach(() => { + memoryExporter.reset(); + }); + + before(async () => { + const config = { + ignoreGrpcMethods: [ + 'UnaryMethod', + new RegExp(/^camel.*Method$/), + (str: string) => str === 'BidiStreamMethod', + ], + }; + plugin.disable(); + plugin.setConfig(config); + plugin.setTracerProvider(provider); + plugin.enable(); + + const packageDefinition = await protoLoader.load(PROTO_PATH, options); + const proto = grpc.loadPackageDefinition(packageDefinition).pkg_test; + + server = await startServer(grpc, proto); + client = createClient(grpc, proto); + }); + + after(done => { + client.close(); + server.tryShutdown(() => { + plugin.disable(); + done(); + }); + }); + + methodList.map(method => { + describe(`Test should ${ + checkSpans[method.methodName] ? '' : 'not ' + }create spans for grpc remote method ${method.methodName}`, () => { + runTest(method, provider, checkSpans[method.methodName]); + }); + }); + }); + }); +}; diff --git a/packages/opentelemetry-instrumentation-grpc/test/utils/assertionUtils.ts b/packages/opentelemetry-instrumentation-grpc/test/utils/assertionUtils.ts new file mode 100644 index 0000000000..055d8a7ad5 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/test/utils/assertionUtils.ts @@ -0,0 +1,80 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { SpanKind, StatusCode } from '@opentelemetry/api'; +import * as assert from 'assert'; +import type * as grpc from 'grpc'; +import type * as grpcJs from '@grpc/grpc-js'; +import { ReadableSpan } from '@opentelemetry/tracing'; +import { + hrTimeToMilliseconds, + hrTimeToMicroseconds, +} from '@opentelemetry/core'; + +export const grpcStatusCodeToOpenTelemetryStatusCode = ( + status: grpc.status | grpcJs.status +): StatusCode => { + if (status !== undefined && status === 0) { + return StatusCode.UNSET; + } + return StatusCode.ERROR; +}; + +export const assertSpan = ( + component: string, + span: ReadableSpan, + kind: SpanKind, + validations: { name: string; status: grpc.status | grpcJs.status } +) => { + assert.strictEqual(span.spanContext.traceId.length, 32); + assert.strictEqual(span.spanContext.spanId.length, 16); + assert.strictEqual(span.kind, kind); + + assert.ok(span.endTime); + assert.strictEqual(span.links.length, 0); + + assert.ok( + hrTimeToMicroseconds(span.startTime) < hrTimeToMicroseconds(span.endTime) + ); + assert.ok(hrTimeToMilliseconds(span.endTime) > 0); + + if (span.kind === SpanKind.SERVER) { + assert.ok(span.spanContext); + } + + // validations + assert.strictEqual(span.name, validations.name); + assert.strictEqual( + span.status.code, + grpcStatusCodeToOpenTelemetryStatusCode(validations.status) + ); +}; + +// Check if sourceSpan was propagated to targetSpan +export const assertPropagation = ( + incomingSpan: ReadableSpan, + outgoingSpan: ReadableSpan +) => { + const targetSpanContext = incomingSpan.spanContext; + const sourceSpanContext = outgoingSpan.spanContext; + assert.strictEqual(targetSpanContext.traceId, sourceSpanContext.traceId); + assert.strictEqual(incomingSpan.parentSpanId, sourceSpanContext.spanId); + assert.strictEqual( + targetSpanContext.traceFlags, + sourceSpanContext.traceFlags + ); + assert.notStrictEqual(targetSpanContext.spanId, sourceSpanContext.spanId); +}; diff --git a/packages/opentelemetry-instrumentation-grpc/tsconfig.json b/packages/opentelemetry-instrumentation-grpc/tsconfig.json new file mode 100644 index 0000000000..ea143a7be3 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/tsconfig.json @@ -0,0 +1,37 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "rootDir": ".", + "outDir": "build" + }, + "include": [ + "src/**/*.ts", + "test/**/*.ts" + ], + "references": [ + { + "path": "../opentelemetry-api" + }, + { + "path": "../opentelemetry-context-async-hooks" + }, + { + "path": "../opentelemetry-context-base" + }, + { + "path": "../opentelemetry-core" + }, + { + "path": "../opentelemetry-instrumentation" + }, + { + "path": "../opentelemetry-node" + }, + { + "path": "../opentelemetry-semantic-conventions" + }, + { + "path": "../opentelemetry-tracing" + } + ] +} diff --git a/packages/opentelemetry-instrumentation/src/platform/node/instrumentation.ts b/packages/opentelemetry-instrumentation/src/platform/node/instrumentation.ts index badf5dd5d3..9da46baa72 100644 --- a/packages/opentelemetry-instrumentation/src/platform/node/instrumentation.ts +++ b/packages/opentelemetry-instrumentation/src/platform/node/instrumentation.ts @@ -19,10 +19,7 @@ import * as path from 'path'; import * as RequireInTheMiddle from 'require-in-the-middle'; import * as semver from 'semver'; import { InstrumentationAbstract } from '../../instrumentation'; -import { - InstrumentationModuleDefinition, - InstrumentationModuleFile, -} from './types'; +import { InstrumentationModuleDefinition } from './types'; /** * Base abstract class for instrumenting node plugins @@ -68,11 +65,9 @@ export abstract class InstrumentationBase return true; } - for (const supportedVersions of module.supportedVersions) { - if (semver.satisfies(version, supportedVersions)) { - return true; - } - } + return module.supportedVersions.some(supportedVersion => { + return semver.satisfies(version, supportedVersion); + }); } } @@ -93,27 +88,31 @@ export abstract class InstrumentationBase return exports; } + const version = require(path.join(baseDir, 'package.json')).version; + module.moduleVersion = version; if (module.name === name) { // main module - const version = require(path.join(baseDir, 'package.json')).version; if (typeof version === 'string' && this._isSupported(name, version)) { if (typeof module.patch === 'function') { module.moduleExports = exports; if (this._enabled) { - return module.patch(exports); + return module.patch(exports, module.moduleVersion); } } } } else { // internal file - const files = module.files || []; - const file = files.find( - (file: InstrumentationModuleFile) => file.name === name - ); - if (file) { + const files = module.files ?? []; + const file = files.find(file => file.name === name); + if ( + file && + file.supportedVersions.some(supportedVersion => + semver.satisfies(version, supportedVersion) + ) + ) { file.moduleExports = exports; if (this._enabled) { - return file.patch(exports); + return file.patch(exports, module.moduleVersion); } } } @@ -130,11 +129,11 @@ export abstract class InstrumentationBase if (this._hooks.length > 0) { for (const module of this._modules) { if (typeof module.patch === 'function' && module.moduleExports) { - module.patch(module.moduleExports); + module.patch(module.moduleExports, module.moduleVersion); } for (const file of module.files) { if (file.moduleExports) { - file.patch(file.moduleExports); + file.patch(file.moduleExports, module.moduleVersion); } } } @@ -169,11 +168,11 @@ export abstract class InstrumentationBase for (const module of this._modules) { if (typeof module.unpatch === 'function' && module.moduleExports) { - module.unpatch(module.moduleExports); + module.unpatch(module.moduleExports, module.moduleVersion); } for (const file of module.files) { if (file.moduleExports) { - file.unpatch(file.moduleExports); + file.unpatch(file.moduleExports, module.moduleVersion); } } } diff --git a/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleDefinition.ts b/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleDefinition.ts index 5ae8868cab..9014dcdeeb 100644 --- a/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleDefinition.ts +++ b/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleDefinition.ts @@ -25,9 +25,9 @@ export class InstrumentationNodeModuleDefinition constructor( public name: string, public supportedVersions: string[], - public patch?: (exports: T) => T, - public unpatch?: (exports: T) => void, - files?: InstrumentationModuleFile[] + public patch?: (exports: T, moduleVersion?: string) => T, + public unpatch?: (exports: T, moduleVersion?: string) => void, + files?: InstrumentationModuleFile[] ) { this.files = files || []; } diff --git a/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleFile.ts b/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleFile.ts index 52fec8f7ec..1aa9965f1a 100644 --- a/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleFile.ts +++ b/packages/opentelemetry-instrumentation/src/platform/node/instrumentationNodeModuleFile.ts @@ -20,7 +20,8 @@ export class InstrumentationNodeModuleFile implements InstrumentationModuleFile { constructor( public name: string, - public patch: (moduleExports: T) => T, - public unpatch: (moduleExports?: T) => void + public supportedVersions: string[], + public patch: (moduleExports: T, moduleVersion?: string) => T, + public unpatch: (moduleExports?: T, moduleVersion?: string) => void ) {} } diff --git a/packages/opentelemetry-instrumentation/src/platform/node/types.ts b/packages/opentelemetry-instrumentation/src/platform/node/types.ts index 51091528eb..972659bf00 100644 --- a/packages/opentelemetry-instrumentation/src/platform/node/types.ts +++ b/packages/opentelemetry-instrumentation/src/platform/node/types.ts @@ -20,13 +20,16 @@ export interface InstrumentationModuleFile { moduleExports?: T; + /** Supported version this file */ + supportedVersions: string[]; + /** Method to patch the instrumentation */ - patch(moduleExports: T): T; + patch(moduleExports: T, moduleVersion?: string): T; /** Method to patch the instrumentation */ /** Method to unpatch the instrumentation */ - unpatch(moduleExports?: T): void; + unpatch(moduleExports?: T, moduleVersion?: string): void; } export interface InstrumentationModuleDefinition { @@ -35,15 +38,18 @@ export interface InstrumentationModuleDefinition { moduleExports?: T; + /** Instrumented module version */ + moduleVersion?: string; + /** Supported version of module */ supportedVersions: string[]; /** Module internal files to be patched */ - files: InstrumentationModuleFile[]; + files: InstrumentationModuleFile[]; /** Method to patch the instrumentation */ - patch?: (moduleExports: T) => T; + patch?: (moduleExports: T, moduleVersion?: string) => T; /** Method to unpatch the instrumentation */ - unpatch?: (moduleExports: T) => void; + unpatch?: (moduleExports: T, moduleVersion?: string) => void; } diff --git a/tsconfig.json b/tsconfig.json index 22efc769bc..da2f2f143f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -50,6 +50,9 @@ { "path": "packages/opentelemetry-grpc-utils" }, + { + "path": "packages/opentelemetry-instrumentation-grpc" + }, { "path": "packages/opentelemetry-instrumentation-http" },