Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added server.address to amqplib spans #2406

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 4 additions & 243 deletions lib/instrumentation/amqplib/amqplib.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,17 @@
'use strict'

const {
MessageSpec,
MessageSubscribeSpec,
OperationSpec,
RecorderSpec,

params: { QueueMessageParameters, DatastoreParameters }
params: { DatastoreParameters }
} = require('../../shim/specs')
const url = require('url')
const wrapModel = require('./channel-model')
const { setCallback } = require('./utils')
const wrapChannel = require('./channel')

module.exports.instrumentPromiseAPI = instrumentChannelAPI
module.exports.instrumentCallbackAPI = instrumentCallbackAPI

const CHANNEL_METHODS = [
'close',
'open',
'assertQueue',
'checkQueue',
'deleteQueue',
'bindQueue',
'unbindQueue',
'assertExchange',
'checkExchange',
'deleteExchange',
'bindExchange',
'unbindExchange',
'cancel',
'prefetch',
'recover'
]

const TEMP_RE = /^amq\./

/**
* Register all the necessary instrumentation when using
* promise based methods
Expand Down Expand Up @@ -91,18 +70,6 @@ function instrumentAMQP(shim, amqp, promiseMode) {
wrapChannel(shim)
}

/**
* Helper to set the appropriate value of the callback property
* in the spec. If it's a promise set to null otherwise set it to `shim.LAST`
*
* @param {Shim} shim instance of shim
* @param {boolean} promiseMode is this promise based?
* @returns {string|null} appropriate value
*/
function setCallback(shim, promiseMode) {
return promiseMode ? null : shim.LAST
}

/**
*
* Instruments the connect method
Expand Down Expand Up @@ -134,209 +101,3 @@ function wrapConnect(shim, amqp, promiseMode) {
})
})
}

/**
*
* Instruments the sendOrEnqueue and sendMessage methods of the ampqlib channel.
*
* @param {Shim} shim instance of shim
*/
function wrapChannel(shim) {
const libChannel = shim.require('./lib/channel')
if (!libChannel?.Channel?.prototype) {
shim.logger.debug('Could not get Channel class to instrument.')
return
}

const proto = libChannel.Channel.prototype
if (shim.isWrapped(proto.sendMessage)) {
shim.logger.trace('Channel already instrumented.')
return
}
shim.logger.trace('Instrumenting basic Channel class.')

shim.wrap(proto, 'sendOrEnqueue', function wrapSendOrEnqueue(shim, fn) {
if (!shim.isFunction(fn)) {
return fn
}

return function wrappedSendOrEnqueue() {
const segment = shim.getSegment()
const cb = arguments[arguments.length - 1]
if (!shim.isFunction(cb) || !segment) {
shim.logger.debug({ cb: !!cb, segment: !!segment }, 'Not binding sendOrEnqueue callback')
return fn.apply(this, arguments)
}

shim.logger.trace('Binding sendOrEnqueue callback to %s', segment.name)
const args = shim.argsToArray.apply(shim, arguments)
args[args.length - 1] = shim.bindSegment(cb, segment)
return fn.apply(this, args)
}
})

shim.recordProduce(proto, 'sendMessage', function recordSendMessage(shim, fn, n, args) {
const fields = args[0]
if (!fields) {
return null
}
const isDefault = fields.exchange === ''
let exchange = 'Default'
if (!isDefault) {
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange
}

return new MessageSpec({
destinationName: exchange,
destinationType: shim.EXCHANGE,
routingKey: fields.routingKey,
headers: fields.headers,
parameters: getParameters(Object.create(null), fields)
})
})
}

/**
* Sets the relevant message parameters
*
* @param {object} parameters object used to store the message parameters
* @param {object} fields fields from the sendMessage method
* @returns {QueueMessageParameters} parameters updated parameters
*/
function getParameters(parameters, fields) {
if (fields.routingKey) {
parameters.routing_key = fields.routingKey
}
if (fields.correlationId) {
parameters.correlation_id = fields.correlationId
}
if (fields.replyTo) {
parameters.reply_to = fields.replyTo
}

return new QueueMessageParameters(parameters)
}

/**
* Sets the QueueMessageParameters from the amqp message
*
* @param {object} message queue message
* @returns {QueueMessageParameters} parameters from message
*/
function getParametersFromMessage(message) {
const parameters = Object.create(null)
getParameters(parameters, message.fields)
getParameters(parameters, message.properties)
return parameters
}

/**
*
* Instruments the relevant channel callback_model or channel_model.
*
* @param {Shim} shim instance of shim
* @param {object} Model either channel or callback model
* @param {boolean} promiseMode is this promise based?
*/
function wrapModel(shim, Model, promiseMode) {
if (!Model.Channel?.prototype) {
shim.logger.debug(
`Could not get ${promiseMode ? 'promise' : 'callback'} model Channel to instrument.`
)
}

const proto = Model.Channel.prototype
if (shim.isWrapped(proto.consume)) {
shim.logger.trace(`${promiseMode ? 'promise' : 'callback'} model already instrumented.`)
return
}

shim.record(proto, CHANNEL_METHODS, function recordChannelMethod(shim, fn, name) {
return new RecorderSpec({
name: 'Channel#' + name,
callback: setCallback(shim, promiseMode),
promise: promiseMode
})
})

shim.recordConsume(
proto,
'get',
new MessageSpec({
destinationName: shim.FIRST,
callback: setCallback(shim, promiseMode),
promise: promiseMode,
after: function handleConsumedMessage({ shim, result, args, segment }) {
if (!shim.agent.config.message_tracer.segment_parameters.enabled) {
shim.logger.trace('Not capturing segment parameters')
return
}

// the message is the param when using the promised based model
const message = promiseMode ? result : args?.[1]
if (!message) {
shim.logger.trace('No results from consume.')
return null
}
const parameters = getParametersFromMessage(message)
shim.copySegmentParameters(segment, parameters)
}
})
)

shim.recordPurgeQueue(proto, 'purgeQueue', function recordPurge(shim, fn, name, args) {
let queue = args[0]
if (TEMP_RE.test(queue)) {
queue = null
}
return new MessageSpec({
queue,
promise: promiseMode,
callback: setCallback(shim, promiseMode)
})
})

shim.recordSubscribedConsume(
proto,
'consume',
new MessageSubscribeSpec({
name: 'amqplib.Channel#consume',
queue: shim.FIRST,
consumer: shim.SECOND,
promise: promiseMode,
callback: promiseMode ? null : shim.FOURTH,
messageHandler: describeMessage
})
)
}

/**
* Extracts the appropriate messageHandler parameters for the consume method.
*
* @param {Shim} shim instance of shim
* @param {Array} args arguments passed to the consume method
* @returns {object} message params
*/
function describeMessage(shim, args) {
const [message] = args

if (!message?.properties) {
shim.logger.debug({ message: message }, 'Failed to find message in consume arguments.')
return null
}

const parameters = getParametersFromMessage(message)
let exchangeName = message?.fields?.exchange || 'Default'

if (TEMP_RE.test(exchangeName)) {
exchangeName = null
}

return new MessageSpec({
destinationName: exchangeName,
destinationType: shim.EXCHANGE,
routingKey: message?.fields?.routingKey,
headers: message.properties.headers,
parameters
})
}
Loading
Loading