From 2cb9521f2b486ae5342e203dbeb6e81971c8dc5b Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Thu, 25 Jul 2024 14:16:47 -0400 Subject: [PATCH 1/2] feat: Added `server.address` to amqplib spans --- lib/instrumentation/amqplib/amqplib.js | 247 +------------------ lib/instrumentation/amqplib/channel-model.js | 129 ++++++++++ lib/instrumentation/amqplib/channel.js | 70 ++++++ lib/instrumentation/amqplib/utils.js | 141 +++++++++++ lib/shim/message-shim/consume.js | 13 +- lib/shim/message-shim/index.js | 21 +- lib/shim/message-shim/subscribe-consume.js | 31 ++- lib/shim/specs/params/queue-message.js | 2 + lib/spans/span-event.js | 29 ++- lib/spans/streaming-span-event.js | 30 ++- lib/symbols.js | 1 + test/unit/shim/message-shim.test.js | 23 +- test/unit/spans/span-event.test.js | 4 + test/unit/spans/streaming-span-event.test.js | 5 + test/versioned/amqplib/amqp-utils.js | 10 + test/versioned/amqplib/callback.tap.js | 2 +- 16 files changed, 463 insertions(+), 295 deletions(-) create mode 100644 lib/instrumentation/amqplib/channel-model.js create mode 100644 lib/instrumentation/amqplib/channel.js create mode 100644 lib/instrumentation/amqplib/utils.js diff --git a/lib/instrumentation/amqplib/amqplib.js b/lib/instrumentation/amqplib/amqplib.js index 28232ec9ee..a356905199 100644 --- a/lib/instrumentation/amqplib/amqplib.js +++ b/lib/instrumentation/amqplib/amqplib.js @@ -6,38 +6,17 @@ 'use strict' const { - MessageSpec, - MessageSubscribeSpec, OperationSpec, - RecorderSpec, - - params: { QueueMessageParameters, DatastoreParameters } + params: { DatastoreParameters } } = require('../../shim/specs') const url = require('url') +const wrapModel = require('./channel-model') +const { setCallback } = require('./utils') +const wrapChannel = require('./channel') module.exports.instrumentPromiseAPI = instrumentChannelAPI module.exports.instrumentCallbackAPI = instrumentCallbackAPI -const CHANNEL_METHODS = [ - 'close', - 'open', - 'assertQueue', - 'checkQueue', - 'deleteQueue', - 'bindQueue', - 'unbindQueue', - 'assertExchange', - 'checkExchange', - 'deleteExchange', - 'bindExchange', - 'unbindExchange', - 'cancel', - 'prefetch', - 'recover' -] - -const TEMP_RE = /^amq\./ - /** * Register all the necessary instrumentation when using * promise based methods @@ -91,18 +70,6 @@ function instrumentAMQP(shim, amqp, promiseMode) { wrapChannel(shim) } -/** - * Helper to set the appropriate value of the callback property - * in the spec. If it's a promise set to null otherwise set it to `shim.LAST` - * - * @param {Shim} shim instance of shim - * @param {boolean} promiseMode is this promise based? - * @returns {string|null} appropriate value - */ -function setCallback(shim, promiseMode) { - return promiseMode ? null : shim.LAST -} - /** * * Instruments the connect method @@ -134,209 +101,3 @@ function wrapConnect(shim, amqp, promiseMode) { }) }) } - -/** - * - * Instruments the sendOrEnqueue and sendMessage methods of the ampqlib channel. - * - * @param {Shim} shim instance of shim - */ -function wrapChannel(shim) { - const libChannel = shim.require('./lib/channel') - if (!libChannel?.Channel?.prototype) { - shim.logger.debug('Could not get Channel class to instrument.') - return - } - - const proto = libChannel.Channel.prototype - if (shim.isWrapped(proto.sendMessage)) { - shim.logger.trace('Channel already instrumented.') - return - } - shim.logger.trace('Instrumenting basic Channel class.') - - shim.wrap(proto, 'sendOrEnqueue', function wrapSendOrEnqueue(shim, fn) { - if (!shim.isFunction(fn)) { - return fn - } - - return function wrappedSendOrEnqueue() { - const segment = shim.getSegment() - const cb = arguments[arguments.length - 1] - if (!shim.isFunction(cb) || !segment) { - shim.logger.debug({ cb: !!cb, segment: !!segment }, 'Not binding sendOrEnqueue callback') - return fn.apply(this, arguments) - } - - shim.logger.trace('Binding sendOrEnqueue callback to %s', segment.name) - const args = shim.argsToArray.apply(shim, arguments) - args[args.length - 1] = shim.bindSegment(cb, segment) - return fn.apply(this, args) - } - }) - - shim.recordProduce(proto, 'sendMessage', function recordSendMessage(shim, fn, n, args) { - const fields = args[0] - if (!fields) { - return null - } - const isDefault = fields.exchange === '' - let exchange = 'Default' - if (!isDefault) { - exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange - } - - return new MessageSpec({ - destinationName: exchange, - destinationType: shim.EXCHANGE, - routingKey: fields.routingKey, - headers: fields.headers, - parameters: getParameters(Object.create(null), fields) - }) - }) -} - -/** - * Sets the relevant message parameters - * - * @param {object} parameters object used to store the message parameters - * @param {object} fields fields from the sendMessage method - * @returns {QueueMessageParameters} parameters updated parameters - */ -function getParameters(parameters, fields) { - if (fields.routingKey) { - parameters.routing_key = fields.routingKey - } - if (fields.correlationId) { - parameters.correlation_id = fields.correlationId - } - if (fields.replyTo) { - parameters.reply_to = fields.replyTo - } - - return new QueueMessageParameters(parameters) -} - -/** - * Sets the QueueMessageParameters from the amqp message - * - * @param {object} message queue message - * @returns {QueueMessageParameters} parameters from message - */ -function getParametersFromMessage(message) { - const parameters = Object.create(null) - getParameters(parameters, message.fields) - getParameters(parameters, message.properties) - return parameters -} - -/** - * - * Instruments the relevant channel callback_model or channel_model. - * - * @param {Shim} shim instance of shim - * @param {object} Model either channel or callback model - * @param {boolean} promiseMode is this promise based? - */ -function wrapModel(shim, Model, promiseMode) { - if (!Model.Channel?.prototype) { - shim.logger.debug( - `Could not get ${promiseMode ? 'promise' : 'callback'} model Channel to instrument.` - ) - } - - const proto = Model.Channel.prototype - if (shim.isWrapped(proto.consume)) { - shim.logger.trace(`${promiseMode ? 'promise' : 'callback'} model already instrumented.`) - return - } - - shim.record(proto, CHANNEL_METHODS, function recordChannelMethod(shim, fn, name) { - return new RecorderSpec({ - name: 'Channel#' + name, - callback: setCallback(shim, promiseMode), - promise: promiseMode - }) - }) - - shim.recordConsume( - proto, - 'get', - new MessageSpec({ - destinationName: shim.FIRST, - callback: setCallback(shim, promiseMode), - promise: promiseMode, - after: function handleConsumedMessage({ shim, result, args, segment }) { - if (!shim.agent.config.message_tracer.segment_parameters.enabled) { - shim.logger.trace('Not capturing segment parameters') - return - } - - // the message is the param when using the promised based model - const message = promiseMode ? result : args?.[1] - if (!message) { - shim.logger.trace('No results from consume.') - return null - } - const parameters = getParametersFromMessage(message) - shim.copySegmentParameters(segment, parameters) - } - }) - ) - - shim.recordPurgeQueue(proto, 'purgeQueue', function recordPurge(shim, fn, name, args) { - let queue = args[0] - if (TEMP_RE.test(queue)) { - queue = null - } - return new MessageSpec({ - queue, - promise: promiseMode, - callback: setCallback(shim, promiseMode) - }) - }) - - shim.recordSubscribedConsume( - proto, - 'consume', - new MessageSubscribeSpec({ - name: 'amqplib.Channel#consume', - queue: shim.FIRST, - consumer: shim.SECOND, - promise: promiseMode, - callback: promiseMode ? null : shim.FOURTH, - messageHandler: describeMessage - }) - ) -} - -/** - * Extracts the appropriate messageHandler parameters for the consume method. - * - * @param {Shim} shim instance of shim - * @param {Array} args arguments passed to the consume method - * @returns {object} message params - */ -function describeMessage(shim, args) { - const [message] = args - - if (!message?.properties) { - shim.logger.debug({ message: message }, 'Failed to find message in consume arguments.') - return null - } - - const parameters = getParametersFromMessage(message) - let exchangeName = message?.fields?.exchange || 'Default' - - if (TEMP_RE.test(exchangeName)) { - exchangeName = null - } - - return new MessageSpec({ - destinationName: exchangeName, - destinationType: shim.EXCHANGE, - routingKey: message?.fields?.routingKey, - headers: message.properties.headers, - parameters - }) -} diff --git a/lib/instrumentation/amqplib/channel-model.js b/lib/instrumentation/amqplib/channel-model.js new file mode 100644 index 0000000000..f994b09f1f --- /dev/null +++ b/lib/instrumentation/amqplib/channel-model.js @@ -0,0 +1,129 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' +const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs') +const CHANNEL_METHODS = [ + 'close', + 'open', + 'assertQueue', + 'checkQueue', + 'deleteQueue', + 'bindQueue', + 'unbindQueue', + 'assertExchange', + 'checkExchange', + 'deleteExchange', + 'bindExchange', + 'unbindExchange', + 'cancel', + 'prefetch', + 'recover' +] +const { + describeMessage, + setCallback, + parseConnect, + getParametersFromMessage, + TEMP_RE +} = require('./utils') + +/** + * + * Instruments the relevant channel callback_model or channel_model. + * + * @param {Shim} shim instance of shim + * @param {object} Model either channel or callback model + * @param {boolean} promiseMode is this promise based? + */ +module.exports = function wrapModel(shim, Model, promiseMode) { + if (!Model.Channel?.prototype) { + shim.logger.debug( + `Could not get ${promiseMode ? 'promise' : 'callback'} model Channel to instrument.` + ) + return + } + + const proto = Model.Channel.prototype + if (shim.isWrapped(proto.consume)) { + shim.logger.trace(`${promiseMode ? 'promise' : 'callback'} model already instrumented.`) + return + } + + recordChannelMethods({ shim, proto, promiseMode }) + recordPurge({ shim, proto, promiseMode }) + recordGet({ shim, proto, promiseMode }) + recordConsume({ shim, proto, promiseMode }) +} + +/** + * Record spans for common methods on channel + * + * @param {Channel} proto prototype of Model.Channel + */ +function recordChannelMethods({ shim, proto, promiseMode }) { + shim.record(proto, CHANNEL_METHODS, function recordChannelMethod(shim, fn, name) { + return new RecorderSpec({ + name: 'Channel#' + name, + callback: setCallback(shim, promiseMode), + promise: promiseMode + }) + }) +} + +function recordPurge({ shim, proto, promiseMode }) { + shim.recordPurgeQueue(proto, 'purgeQueue', function purge(shim, fn, name, args) { + let queue = args[0] + if (TEMP_RE.test(queue)) { + queue = null + } + return new MessageSpec({ + queue, + promise: promiseMode, + callback: setCallback(shim, promiseMode) + }) + }) +} + +function recordGet({ shim, proto, promiseMode }) { + shim.recordConsume(proto, 'get', function wrapGet() { + const { host, port } = parseConnect(this?.connection?.stream) + return new MessageSpec({ + destinationName: shim.FIRST, + callback: setCallback(shim, promiseMode), + promise: promiseMode, + after: function handleConsumedMessage({ shim, result, args, segment }) { + if (!shim.agent.config.message_tracer.segment_parameters.enabled) { + shim.logger.trace('Not capturing segment parameters') + return + } + + // the message is the param when using the promised based model + const message = promiseMode ? result : args?.[1] + if (!message) { + shim.logger.trace('No results from consume.') + return null + } + const parameters = getParametersFromMessage({ message, host, port }) + shim.copySegmentParameters(segment, parameters) + } + }) + }) +} + +function recordConsume({ shim, proto, promiseMode }) { + shim.recordSubscribedConsume(proto, 'consume', function consume() { + const { host, port } = parseConnect(this?.connection?.stream) + return new MessageSubscribeSpec({ + name: 'amqplib.Channel#consume', + queue: shim.FIRST, + consumer: shim.SECOND, + promise: promiseMode, + parameters: { host, port }, + callback: promiseMode ? null : shim.FOURTH, + messageHandler: describeMessage({ host, port }) + }) + }) +} diff --git a/lib/instrumentation/amqplib/channel.js b/lib/instrumentation/amqplib/channel.js new file mode 100644 index 0000000000..903ca1a4b2 --- /dev/null +++ b/lib/instrumentation/amqplib/channel.js @@ -0,0 +1,70 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' +const { MessageSpec } = require('../../shim/specs') +const { parseConnect, getParameters, TEMP_RE } = require('./utils') + +/** + * + * Instruments the sendOrEnqueue and sendMessage methods of the ampqlib channel. + * + * @param {Shim} shim instance of shim + */ +module.exports = function wrapChannel(shim) { + const libChannel = shim.require('./lib/channel') + if (!libChannel?.Channel?.prototype) { + shim.logger.debug('Could not get Channel class to instrument.') + return + } + + const proto = libChannel.Channel.prototype + if (shim.isWrapped(proto.sendMessage)) { + shim.logger.trace('Channel already instrumented.') + return + } + shim.logger.trace('Instrumenting basic Channel class.') + + shim.wrap(proto, 'sendOrEnqueue', function wrapSendOrEnqueue(shim, fn) { + if (!shim.isFunction(fn)) { + return fn + } + + return function wrappedSendOrEnqueue() { + const segment = shim.getSegment() + const cb = arguments[arguments.length - 1] + if (!shim.isFunction(cb) || !segment) { + shim.logger.debug({ cb: !!cb, segment: !!segment }, 'Not binding sendOrEnqueue callback') + return fn.apply(this, arguments) + } + + shim.logger.trace('Binding sendOrEnqueue callback to %s', segment.name) + const args = shim.argsToArray.apply(shim, arguments) + args[args.length - 1] = shim.bindSegment(cb, segment) + return fn.apply(this, args) + } + }) + + shim.recordProduce(proto, 'sendMessage', function recordSendMessage(shim, fn, n, args) { + const fields = args[0] + if (!fields) { + return null + } + const isDefault = fields.exchange === '' + let exchange = 'Default' + if (!isDefault) { + exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange + } + const { host, port } = parseConnect(this?.connection?.stream) + + return new MessageSpec({ + destinationName: exchange, + destinationType: shim.EXCHANGE, + routingKey: fields.routingKey, + headers: fields.headers, + parameters: getParameters({ parameters: Object.create(null), fields, host, port }) + }) + }) +} diff --git a/lib/instrumentation/amqplib/utils.js b/lib/instrumentation/amqplib/utils.js new file mode 100644 index 0000000000..76caca2e05 --- /dev/null +++ b/lib/instrumentation/amqplib/utils.js @@ -0,0 +1,141 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' +const { + MessageSpec, + params: { QueueMessageParameters } +} = require('../../shim/specs') +const { amqpConnection } = require('../../symbols') +const TEMP_RE = /^amq\./ + +/** + * Wrapper around message handler to pass host/port + * + * @param {object} params to function + * @param {string} params.host hostname + * @param {number} params.port port + * @returns {function} message handler + */ +function describeMessage({ host, port }) { + /** + * Extracts the appropriate messageHandler parameters for the consume method. + * + * @param {Shim} shim instance of shim + * @param {Array} args arguments passed to the consume method + * @returns {object} message params + */ + return function messageHandler(shim, args) { + const [message] = args + + if (!message?.properties) { + shim.logger.debug({ message: message }, 'Failed to find message in consume arguments.') + return null + } + + const parameters = getParametersFromMessage({ message, host, port }) + let exchangeName = message?.fields?.exchange || 'Default' + + if (TEMP_RE.test(exchangeName)) { + exchangeName = null + } + + return new MessageSpec({ + destinationName: exchangeName, + destinationType: shim.EXCHANGE, + routingKey: message?.fields?.routingKey, + headers: message.properties.headers, + parameters + }) + } +} + +/** + * Sets the relevant message parameters + * + * @param {object} params to function + * @param {object} params.parameters object used to store the message parameters + * @param {object} params.fields fields from the sendMessage method + * @param {string} params.host hostname + * @param {number} params.port port + * @returns {QueueMessageParameters} parameters updated parameters + */ +function getParameters({ parameters, fields, host, port }) { + if (fields.routingKey) { + parameters.routing_key = fields.routingKey + } + if (fields.correlationId) { + parameters.correlation_id = fields.correlationId + } + if (fields.replyTo) { + parameters.reply_to = fields.replyTo + } + + if (host) { + parameters.host = host + } + + if (port) { + parameters.port = port + } + + return new QueueMessageParameters(parameters) +} + +/** + * Sets the QueueMessageParameters from the amqp message + * + * @param {object} params to function + * @param {object} params.message queue message + * @param {string} params.host host + * @param {number} params.port port + * @returns {QueueMessageParameters} parameters from message + */ +function getParametersFromMessage({ message, host, port }) { + const parameters = Object.create(null) + getParameters({ parameters, fields: message.fields, host, port }) + getParameters({ parameters, fields: message.properties }) + return parameters +} + +/** + * Extracts the host/port from the amqp socket connection. + * Stores on connection as symbol to only parse once. + * + * @param {Socket} socket amqp connection + * @returns {object} {host, port } of connection + */ +function parseConnect(socket) { + if (socket[amqpConnection]) { + return socket[amqpConnection] + } + const host = ['127.0.0.1', '::1', '[::1]'].includes(socket?.remoteAddress) + ? 'localhost' + : socket?.remoteAddress + const port = socket?.remotePort + socket[amqpConnection] = { host, port } + return { host, port } +} + +/** + * Helper to set the appropriate value of the callback property + * in the spec. If it's a promise set to null otherwise set it to `shim.LAST` + * + * @param {Shim} shim instance of shim + * @param {boolean} promiseMode is this promise based? + * @returns {string|null} appropriate value + */ +function setCallback(shim, promiseMode) { + return promiseMode ? null : shim.LAST +} + +module.exports = { + describeMessage, + getParameters, + getParametersFromMessage, + parseConnect, + setCallback, + TEMP_RE +} diff --git a/lib/shim/message-shim/consume.js b/lib/shim/message-shim/consume.js index 91ffa44cb9..97c011df74 100644 --- a/lib/shim/message-shim/consume.js +++ b/lib/shim/message-shim/consume.js @@ -18,16 +18,21 @@ module.exports = createRecorder * @param {Function} params.fn consumer function * @param {string} params.fnName name of function * @param {Array} params.args arguments passed to original consume function + * @param {Object} params.ctx this binding of the original function * @param {specs.MessageSpec} params.spec spec for the wrapped consume function * @returns {specs.MessageSpec} new spec */ -function updateSpecFromArgs({ shim, fn, fnName, args, spec }) { +function updateSpecFromArgs({ shim, fn, fnName, args, spec, ctx }) { let msgDesc = null if (shim.isFunction(spec)) { - msgDesc = spec(shim, fn, fnName, args) + msgDesc = spec.call(ctx, shim, fn, fnName, args) } else { msgDesc = spec - const destIdx = shim.normalizeIndex(args.length, spec.destinationName) + } + + const destNameIsArg = shim.isNumber(msgDesc.destinationName) + if (destNameIsArg) { + const destIdx = shim.normalizeIndex(args.length, msgDesc.destinationName) if (destIdx !== null) { msgDesc.destinationName = args[destIdx] } @@ -48,7 +53,7 @@ function updateSpecFromArgs({ shim, fn, fnName, args, spec }) { * @returns {specs.MessageSpec} updated spec with logic to name segment and apply the genericRecorder */ function createRecorder({ spec, shim, fn, fnName, args }) { - const msgDesc = updateSpecFromArgs({ shim, fn, fnName, args, spec }) + const msgDesc = updateSpecFromArgs({ shim, fn, fnName, args, spec, ctx: this }) // Adds details needed by createSegment when used with a spec msgDesc.name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME) msgDesc.recorder = genericRecorder diff --git a/lib/shim/message-shim/index.js b/lib/shim/message-shim/index.js index 9a6cfc0506..6ecd9eba02 100644 --- a/lib/shim/message-shim/index.js +++ b/lib/shim/message-shim/index.js @@ -298,7 +298,7 @@ function recordConsume(nodule, properties, spec) { } return this.record(nodule, properties, function wrapConsume(shim, fn, fnName, args) { - return createRecorder({ spec, shim, fn, fnName, args }) + return createRecorder.call(this, { spec, shim, fn, fnName, args }) }) } @@ -405,29 +405,21 @@ function recordSubscribedConsume(nodule, properties, spec) { properties = null } - // Make sure our spec has what we need. - if (!this.isFunction(spec.messageHandler)) { - this.logger.debug('spec.messageHandler should be a function') - return nodule - } else if (!this.isNumber(spec.consumer)) { - this.logger.debug('spec.consumer is required for recordSubscribedConsume') - return nodule - } - - const destNameIsArg = this.isNumber(spec.destinationName) - // Must wrap the subscribe method independently to ensure that we can wrap // the consumer regardless of transaction state. - const wrapped = this.wrap(nodule, properties, function wrapSubscribe(shim, fn) { + const wrapped = this.wrap(nodule, properties, function wrapSubscribe(shim, fn, name) { if (!shim.isFunction(fn)) { return fn } - return createSubscriberWrapper({ shim, fn, spec, destNameIsArg }) + return createSubscriberWrapper.call(this, { shim, fn, spec, name }) }) // Wrap the subscriber with segment creation. return this.record(wrapped, properties, function recordSubscribe(shim, fn, name, args) { + if (shim.isFunction(spec)) { + spec = spec.call(this, shim, fn, name, args) + } // Make sure the specified consumer and callback indexes do not overlap. // This could happen for instance if the function signature is // `fn(consumer [, callback])` and specified as `consumer: shim.FIRST`, @@ -442,6 +434,7 @@ function recordSubscribedConsume(nodule, properties, spec) { name: spec.name || name, callback: cbIdx, promise: spec.promise, + parameters: spec.parameters, stream: false, internal: false }) diff --git a/lib/shim/message-shim/subscribe-consume.js b/lib/shim/message-shim/subscribe-consume.js index 39467cce97..8a38e805f5 100644 --- a/lib/shim/message-shim/subscribe-consume.js +++ b/lib/shim/message-shim/subscribe-consume.js @@ -6,7 +6,6 @@ 'use strict' const ATTR_DESTS = require('../../config/attribute-filter').DESTINATIONS const messageTransactionRecorder = require('../../metrics/recorders/message-transaction') -const props = require('../../util/properties') const specs = require('../specs') module.exports = createSubscriberWrapper @@ -38,12 +37,29 @@ function _nameMessageTransaction(shim, msgDesc) { * @param {MessageShim} params.shim instance of shim * @param {Function} params.fn subscriber function * @param {specs.MessageSubscribeSpec} params.spec spec for subscriber + * @param params.name * @param {boolean} params.destNameIsArg flag to state if destination is an argument * @returns {Function} wrapped subscribe function */ -function createSubscriberWrapper({ shim, fn, spec, destNameIsArg }) { +function createSubscriberWrapper({ shim, fn, spec, name }) { return function wrappedSubscribe() { const args = shim.argsToArray.apply(shim, arguments) + + if (shim.isFunction(spec)) { + spec = spec.call(this, shim, fn, name, args) + } + + // Make sure our spec has what we need. + if (!shim.isFunction(spec.messageHandler)) { + shim.logger.debug('spec.messageHandler should be a function') + return fn.apply(this, args) + } else if (!shim.isNumber(spec.consumer)) { + shim.logger.debug('spec.consumer is required for recordSubscribedConsume') + return fn.apply(this, args) + } + + const destNameIsArg = shim.isNumber(spec.destinationName) + const queueIdx = shim.normalizeIndex(args.length, spec.queue) const consumerIdx = shim.normalizeIndex(args.length, spec.consumer) const queue = queueIdx === null ? null : args[queueIdx] @@ -59,7 +75,7 @@ function createSubscriberWrapper({ shim, fn, spec, destNameIsArg }) { if (consumerIdx !== null && !spec.functions) { args[consumerIdx] = shim.wrap( args[consumerIdx], - makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) + makeWrapConsumer.call(this, { spec, queue, destinationName, destNameIsArg }) ) } @@ -69,7 +85,8 @@ function createSubscriberWrapper({ shim, fn, spec, destNameIsArg }) { if (args[consumerIdx][name]) { args[consumerIdx][name] = shim.wrap( args[consumerIdx][name], - makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) + // bind the proper this scope into the consumers + makeWrapConsumer.call(this, { spec, queue, destinationName, destNameIsArg }) ) } }) @@ -101,7 +118,7 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) { return consumer } - const consumerWrapper = createConsumerWrapper({ shim, consumer, spec }) + const consumerWrapper = createConsumerWrapper.call(this, { shim, consumer, spec }) return shim.bindCreateTransaction( consumerWrapper, new specs.TransactionSpec({ @@ -155,7 +172,9 @@ function createConsumerWrapper({ shim, spec, consumer }) { // Add would-be baseSegment attributes to transaction trace for (const key in msgDesc.parameters) { - if (props.hasOwn(msgDesc.parameters, key)) { + if (['host', 'port'].includes(key)) { + tx.baseSegment.addAttribute(key, msgDesc.parameters[key]) + } else { tx.trace.attributes.addAttribute( ATTR_DESTS.NONE, 'message.parameters.' + key, diff --git a/lib/shim/specs/params/queue-message.js b/lib/shim/specs/params/queue-message.js index 3e16e0a29c..9b3e5ed965 100644 --- a/lib/shim/specs/params/queue-message.js +++ b/lib/shim/specs/params/queue-message.js @@ -25,6 +25,8 @@ class QueueMessageParameters { this.correlation_id = params.correlation_id ?? null this.reply_to = params.reply_to ?? null this.routing_key = params.routing_key ?? null + this.host = params.host ?? null + this.port = params.port ?? null } } diff --git a/lib/spans/span-event.js b/lib/spans/span-event.js index d366339fae..2b63b1b021 100644 --- a/lib/spans/span-event.js +++ b/lib/spans/span-event.js @@ -22,6 +22,7 @@ const EXTERNAL_REGEX = /^(?:Truncated\/)?External\// const DATASTORE_REGEX = /^(?:Truncated\/)?Datastore\// const EMPTY_USER_ATTRS = Object.freeze(Object.create(null)) +const SERVER_ADDRESS = 'server.address' /** * All the intrinsic attributes for span events, regardless of kind. @@ -61,6 +62,16 @@ class SpanEvent { this.customAttributes = customAttributes this.attributes = attributes this.intrinsics = new SpanIntrinsics() + + if (attributes.host) { + this.addAttribute(SERVER_ADDRESS, attributes.host) + attributes.host = null + } + + if (attributes.port) { + this.addAttribute('server.port', attributes.port, true) + attributes.port = null + } } static get CATEGORIES() { @@ -195,15 +206,10 @@ class HttpSpanEvent extends SpanEvent { } if (attributes.hostname) { - this.addAttribute('server.address', attributes.hostname) + this.addAttribute(SERVER_ADDRESS, attributes.hostname) attributes.hostname = null } - if (attributes.port) { - this.addAttribute('server.port', attributes.port, true) - attributes.port = null - } - if (attributes.procedure) { this.addAttribute('http.method', attributes.procedure) this.addAttribute('http.request.method', attributes.procedure) @@ -259,17 +265,18 @@ class DatastoreSpanEvent extends SpanEvent { attributes.database_name = null } - if (attributes.host) { - this.addAttribute('peer.hostname', attributes.host) - this.addAttribute('server.address', attributes.host) + // `attributes.host` was translated in SpanEvent class + // use `server.address` to assign to `peer.hostname` + const serverAddress = attributes[SERVER_ADDRESS] + if (serverAddress) { + this.addAttribute('peer.hostname', serverAddress) if (attributes.port_path_or_id) { - const address = `${attributes.host}:${attributes.port_path_or_id}` + const address = `${serverAddress}:${attributes.port_path_or_id}` this.addAttribute('peer.address', address) this.addAttribute('server.port', attributes.port_path_or_id, true) attributes.port_path_or_id = null } - attributes.host = null } } diff --git a/lib/spans/streaming-span-event.js b/lib/spans/streaming-span-event.js index adc717b3ec..da24a7c564 100644 --- a/lib/spans/streaming-span-event.js +++ b/lib/spans/streaming-span-event.js @@ -37,7 +37,7 @@ class StreamingSpanEvent { * @param {object} customAttributes Initial set of custom attributes. * Must be pre-filtered and truncated. */ - constructor(traceId, agentAttributes, customAttributes) { + constructor(traceId, agentAttributes = {}, customAttributes) { this._traceId = traceId this._intrinsicAttributes = new StreamingSpanAttributes() @@ -46,7 +46,16 @@ class StreamingSpanEvent { this._intrinsicAttributes.addAttribute('category', CATEGORIES.GENERIC) this._customAttributes = new StreamingSpanAttributes(customAttributes) - this._agentAttributes = new StreamingSpanAttributes(agentAttributes) + const { host, port, ...agentAttrs } = agentAttributes + this._agentAttributes = new StreamingSpanAttributes(agentAttrs) + + if (host) { + this.addAgentAttribute('server.address', host) + } + + if (port) { + this.addAgentAttribute('server.port', port, true) + } } /** @@ -183,7 +192,7 @@ class StreamingHttpSpanEvent extends StreamingSpanEvent { */ constructor(traceId, agentAttributes, customAttributes) { // remove mapped attributes before creating other agentAttributes - const { library, url, hostname, port, procedure, ...agentAttrs } = agentAttributes + const { library, url, hostname, procedure, ...agentAttrs } = agentAttributes super(traceId, agentAttrs, customAttributes) this.addIntrinsicAttribute('category', CATEGORIES.HTTP) @@ -196,11 +205,6 @@ class StreamingHttpSpanEvent extends StreamingSpanEvent { if (hostname) { this.addAgentAttribute('server.address', hostname) - agentAttributes.hostname = null - } - - if (port) { - this.addAgentAttribute('server.port', port, true) } if (procedure) { @@ -239,7 +243,6 @@ class StreamingDatastoreSpanEvent extends StreamingSpanEvent { sql, sql_obfuscated, database_name, - host, port_path_or_id, ...agentAttrs } = agentAttributes @@ -273,12 +276,13 @@ class StreamingDatastoreSpanEvent extends StreamingSpanEvent { this.addAgentAttribute('db.instance', database_name) } - if (host) { - this.addAgentAttribute('peer.hostname', host) - this.addAgentAttribute('server.address', host) + // `host` was translated in StreamingSpanEvent class + // use `server.address` to assign to `peer.hostname` + if (agentAttributes.host) { + this.addAgentAttribute('peer.hostname', agentAttributes.host) if (port_path_or_id) { - const address = `${host}:${port_path_or_id}` + const address = `${agentAttributes.host}:${port_path_or_id}` this.addAgentAttribute('peer.address', address) this.addAgentAttribute('server.port', port_path_or_id, true) } diff --git a/lib/symbols.js b/lib/symbols.js index 99bed59f81..6d65082d69 100644 --- a/lib/symbols.js +++ b/lib/symbols.js @@ -6,6 +6,7 @@ 'use strict' module.exports = { + amqpConnection: Symbol('amqpConnection'), clm: Symbol('codeLevelMetrics'), context: Symbol('context'), databaseName: Symbol('databaseName'), diff --git a/test/unit/shim/message-shim.test.js b/test/unit/shim/message-shim.test.js index 5a3d49a27b..2f4e86543d 100644 --- a/test/unit/shim/message-shim.test.js +++ b/test/unit/shim/message-shim.test.js @@ -505,6 +505,7 @@ tap.test('MessageShim', function (t) { t.test('should create a consume segment', function (t) { shim.recordConsume(wrappable, 'getActiveSegment', function () { + t.same(this, wrappable, 'make sure this is in tact') return new MessageSpec({ destinationName: 'foobar' }) }) @@ -885,6 +886,19 @@ tap.test('MessageShim', function (t) { t.not(shim.isWrapped(wrappable.name)) t.end() }) + + t.test('should allow spec to be a function', function (t) { + shim.recordSubscribedConsume(wrappable, 'name', function () { + t.same(this, wrappable, 'should preserve this context') + return { + consumer: shim.FIRST, + messageHandler: function () {}, + wrapper: function () {} + } + }) + t.not(shim.isWrapped(wrappable.name)) + t.end() + }) }) t.test('#recordSubscribedConsume wrapper', function (t) { @@ -1227,7 +1241,7 @@ tap.test('MessageShim', function (t) { }) t.test('should wrap object key of consumer', function (t) { - t.plan(3) + t.plan(4) const message = { foo: 'bar' } const subscriber = function subscriber(consumer) { consumer.eachMessage(message) @@ -1244,14 +1258,17 @@ tap.test('MessageShim', function (t) { }) } }) - wrapped({ + + const handler = { eachMessage: function consumer(msg) { + t.same(this, handler) const segment = shim.getSegment() t.equal(segment.name, 'OtherTransaction/Message/RabbitMQ/Exchange/Named/exchange.foo') t.equal(msg, message) t.end() } - }) + } + wrapped(handler) }) }) }) diff --git a/test/unit/spans/span-event.test.js b/test/unit/spans/span-event.test.js index 7121d35224..0292e7b3e6 100644 --- a/test/unit/spans/span-event.test.js +++ b/test/unit/spans/span-event.test.js @@ -68,6 +68,8 @@ tap.test('fromSegment()', (t) => { setTimeout(() => { const segment = agent.tracer.getTransaction().trace.root.children[0] segment.addSpanAttribute('SpiderSpan', 'web') + segment.addSpanAttribute('host', 'my-host') + segment.addSpanAttribute('port', 222) const spanContext = segment.getSpanContext() spanContext.addCustomAttribute('Span Lee', 'no prize') @@ -108,6 +110,8 @@ tap.test('fromSegment()', (t) => { const hasOwnAttribute = Object.hasOwnProperty.bind(attributes) t.ok(hasOwnAttribute('SpiderSpan'), 'Should have attribute added through segment') + t.equal(attributes['server.address'], 'my-host') + t.equal(attributes['server.port'], 222) // Should have no http properties. t.notOk(hasOwnAttribute('externalLibrary')) diff --git a/test/unit/spans/streaming-span-event.test.js b/test/unit/spans/streaming-span-event.test.js index d61cb2ed22..80d82171f7 100644 --- a/test/unit/spans/streaming-span-event.test.js +++ b/test/unit/spans/streaming-span-event.test.js @@ -67,6 +67,8 @@ tap.test('fromSegment()', (t) => { const segment = agent.tracer.getTransaction().trace.root.children[0] const spanContext = segment.getSpanContext() spanContext.addCustomAttribute('Span Lee', 'no prize') + segment.addSpanAttribute('host', 'my-host') + segment.addSpanAttribute('port', 22) const span = StreamingSpanEvent.fromSegment(segment, 'parent') @@ -102,6 +104,9 @@ tap.test('fromSegment()', (t) => { const agentAttributes = span._agentAttributes t.ok(agentAttributes) + t.same(agentAttributes['server.address'], { [STRING_TYPE]: 'my-host' }) + t.same(agentAttributes['server.port'], { [INT_TYPE]: 22 }) + // Should have no http properties. const hasOwnAttribute = Object.hasOwnProperty.bind(agentAttributes) t.notOk(hasOwnAttribute('externalLibrary')) diff --git a/test/versioned/amqplib/amqp-utils.js b/test/versioned/amqplib/amqp-utils.js index 0599a08d90..f05f2e988f 100644 --- a/test/versioned/amqplib/amqp-utils.js +++ b/test/versioned/amqplib/amqp-utils.js @@ -133,6 +133,9 @@ function verifyConsumeTransaction(t, tx, exchange, queue, routingKey) { 'OtherTransaction/Message/RabbitMQ/Exchange/Named/' + exchange ) t.equal(consume, tx.baseSegment) + const segmentAttrs = consume.getAttributes() + t.equal(segmentAttrs.host, params.rabbitmq_host, 'should have host on segment') + t.equal(segmentAttrs.port, params.rabbitmq_port, 'should have port on segment') const attributes = tx.trace.attributes.get(DESTINATIONS.TRANS_TRACE) t.equal( @@ -158,6 +161,8 @@ function verifySendToQueue(t, tx) { 'MessageBroker/RabbitMQ/Exchange/Produce/Named/Default' ) const attributes = segment.getAttributes() + t.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') + t.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') t.equal(attributes.routing_key, 'testQueue', 'should store routing key') t.equal(attributes.reply_to, 'my.reply.queue', 'should store reply to') t.equal(attributes.correlation_id, 'correlation-id', 'should store correlation id') @@ -225,6 +230,9 @@ function verifyProduce(t, tx, exchangeName, routingKey) { } else { t.notOk(attributes.routing_key, 'should not have routing key') } + + t.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') + t.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') } function verifyGet({ t, tx, exchangeName, routingKey, queue, assertAttr }) { @@ -240,6 +248,8 @@ function verifyGet({ t, tx, exchangeName, routingKey, queue, assertAttr }) { if (assertAttr) { const segment = metrics.findSegment(tx.trace.root, consumeName) const attributes = segment.getAttributes() + t.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') + t.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') t.equal(attributes.routing_key, routingKey, 'should have routing key on get') } } diff --git a/test/versioned/amqplib/callback.tap.js b/test/versioned/amqplib/callback.tap.js index 0d8d5fa186..9c034d848e 100644 --- a/test/versioned/amqplib/callback.tap.js +++ b/test/versioned/amqplib/callback.tap.js @@ -375,7 +375,7 @@ tap.test('amqplib callback instrumentation', function (t) { function (msg) { const consumeTxnHandle = api.getTransaction() const consumeTxn = consumeTxnHandle._transaction - t.notEqual(consumeTxn, tx, 'should not be in original transaction') + t.not(consumeTxn, tx, 'should not be in original transaction') t.ok(msg, 'should receive a message') const body = msg.content.toString('utf8') From d5262550d005f647ecdf3ffb078830196690ba5c Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Tue, 30 Jul 2024 15:28:07 -0400 Subject: [PATCH 2/2] chore: removed unncessary comments around attribute translation --- lib/spans/span-event.js | 3 +-- lib/spans/streaming-span-event.js | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/spans/span-event.js b/lib/spans/span-event.js index 2b63b1b021..fcc04064e9 100644 --- a/lib/spans/span-event.js +++ b/lib/spans/span-event.js @@ -265,9 +265,8 @@ class DatastoreSpanEvent extends SpanEvent { attributes.database_name = null } - // `attributes.host` was translated in SpanEvent class - // use `server.address` to assign to `peer.hostname` const serverAddress = attributes[SERVER_ADDRESS] + if (serverAddress) { this.addAttribute('peer.hostname', serverAddress) diff --git a/lib/spans/streaming-span-event.js b/lib/spans/streaming-span-event.js index da24a7c564..4d435be4e2 100644 --- a/lib/spans/streaming-span-event.js +++ b/lib/spans/streaming-span-event.js @@ -276,8 +276,6 @@ class StreamingDatastoreSpanEvent extends StreamingSpanEvent { this.addAgentAttribute('db.instance', database_name) } - // `host` was translated in StreamingSpanEvent class - // use `server.address` to assign to `peer.hostname` if (agentAttributes.host) { this.addAgentAttribute('peer.hostname', agentAttributes.host)