Skip to content

Commit

Permalink
feat: Added server.address to amqplib spans
Browse files Browse the repository at this point in the history
  • Loading branch information
bizob2828 committed Jul 26, 2024
1 parent a53085d commit 2cb9521
Show file tree
Hide file tree
Showing 16 changed files with 463 additions and 295 deletions.
247 changes: 4 additions & 243 deletions lib/instrumentation/amqplib/amqplib.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,17 @@
'use strict'

const {
MessageSpec,
MessageSubscribeSpec,
OperationSpec,
RecorderSpec,

params: { QueueMessageParameters, DatastoreParameters }
params: { DatastoreParameters }
} = require('../../shim/specs')
const url = require('url')
const wrapModel = require('./channel-model')
const { setCallback } = require('./utils')
const wrapChannel = require('./channel')

module.exports.instrumentPromiseAPI = instrumentChannelAPI
module.exports.instrumentCallbackAPI = instrumentCallbackAPI

const CHANNEL_METHODS = [
'close',
'open',
'assertQueue',
'checkQueue',
'deleteQueue',
'bindQueue',
'unbindQueue',
'assertExchange',
'checkExchange',
'deleteExchange',
'bindExchange',
'unbindExchange',
'cancel',
'prefetch',
'recover'
]

const TEMP_RE = /^amq\./

/**
* Register all the necessary instrumentation when using
* promise based methods
Expand Down Expand Up @@ -91,18 +70,6 @@ function instrumentAMQP(shim, amqp, promiseMode) {
wrapChannel(shim)
}

/**
* Helper to set the appropriate value of the callback property
* in the spec. If it's a promise set to null otherwise set it to `shim.LAST`
*
* @param {Shim} shim instance of shim
* @param {boolean} promiseMode is this promise based?
* @returns {string|null} appropriate value
*/
function setCallback(shim, promiseMode) {
return promiseMode ? null : shim.LAST
}

/**
*
* Instruments the connect method
Expand Down Expand Up @@ -134,209 +101,3 @@ function wrapConnect(shim, amqp, promiseMode) {
})
})
}

/**
*
* Instruments the sendOrEnqueue and sendMessage methods of the ampqlib channel.
*
* @param {Shim} shim instance of shim
*/
function wrapChannel(shim) {
const libChannel = shim.require('./lib/channel')
if (!libChannel?.Channel?.prototype) {
shim.logger.debug('Could not get Channel class to instrument.')
return
}

const proto = libChannel.Channel.prototype
if (shim.isWrapped(proto.sendMessage)) {
shim.logger.trace('Channel already instrumented.')
return
}
shim.logger.trace('Instrumenting basic Channel class.')

shim.wrap(proto, 'sendOrEnqueue', function wrapSendOrEnqueue(shim, fn) {
if (!shim.isFunction(fn)) {
return fn
}

return function wrappedSendOrEnqueue() {
const segment = shim.getSegment()
const cb = arguments[arguments.length - 1]
if (!shim.isFunction(cb) || !segment) {
shim.logger.debug({ cb: !!cb, segment: !!segment }, 'Not binding sendOrEnqueue callback')
return fn.apply(this, arguments)
}

shim.logger.trace('Binding sendOrEnqueue callback to %s', segment.name)
const args = shim.argsToArray.apply(shim, arguments)
args[args.length - 1] = shim.bindSegment(cb, segment)
return fn.apply(this, args)
}
})

shim.recordProduce(proto, 'sendMessage', function recordSendMessage(shim, fn, n, args) {
const fields = args[0]
if (!fields) {
return null
}
const isDefault = fields.exchange === ''
let exchange = 'Default'
if (!isDefault) {
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange
}

return new MessageSpec({
destinationName: exchange,
destinationType: shim.EXCHANGE,
routingKey: fields.routingKey,
headers: fields.headers,
parameters: getParameters(Object.create(null), fields)
})
})
}

/**
* Sets the relevant message parameters
*
* @param {object} parameters object used to store the message parameters
* @param {object} fields fields from the sendMessage method
* @returns {QueueMessageParameters} parameters updated parameters
*/
function getParameters(parameters, fields) {
if (fields.routingKey) {
parameters.routing_key = fields.routingKey
}
if (fields.correlationId) {
parameters.correlation_id = fields.correlationId
}
if (fields.replyTo) {
parameters.reply_to = fields.replyTo
}

return new QueueMessageParameters(parameters)
}

/**
* Sets the QueueMessageParameters from the amqp message
*
* @param {object} message queue message
* @returns {QueueMessageParameters} parameters from message
*/
function getParametersFromMessage(message) {
const parameters = Object.create(null)
getParameters(parameters, message.fields)
getParameters(parameters, message.properties)
return parameters
}

/**
*
* Instruments the relevant channel callback_model or channel_model.
*
* @param {Shim} shim instance of shim
* @param {object} Model either channel or callback model
* @param {boolean} promiseMode is this promise based?
*/
function wrapModel(shim, Model, promiseMode) {
if (!Model.Channel?.prototype) {
shim.logger.debug(
`Could not get ${promiseMode ? 'promise' : 'callback'} model Channel to instrument.`
)
}

const proto = Model.Channel.prototype
if (shim.isWrapped(proto.consume)) {
shim.logger.trace(`${promiseMode ? 'promise' : 'callback'} model already instrumented.`)
return
}

shim.record(proto, CHANNEL_METHODS, function recordChannelMethod(shim, fn, name) {
return new RecorderSpec({
name: 'Channel#' + name,
callback: setCallback(shim, promiseMode),
promise: promiseMode
})
})

shim.recordConsume(
proto,
'get',
new MessageSpec({
destinationName: shim.FIRST,
callback: setCallback(shim, promiseMode),
promise: promiseMode,
after: function handleConsumedMessage({ shim, result, args, segment }) {
if (!shim.agent.config.message_tracer.segment_parameters.enabled) {
shim.logger.trace('Not capturing segment parameters')
return
}

// the message is the param when using the promised based model
const message = promiseMode ? result : args?.[1]
if (!message) {
shim.logger.trace('No results from consume.')
return null
}
const parameters = getParametersFromMessage(message)
shim.copySegmentParameters(segment, parameters)
}
})
)

shim.recordPurgeQueue(proto, 'purgeQueue', function recordPurge(shim, fn, name, args) {
let queue = args[0]
if (TEMP_RE.test(queue)) {
queue = null
}
return new MessageSpec({
queue,
promise: promiseMode,
callback: setCallback(shim, promiseMode)
})
})

shim.recordSubscribedConsume(
proto,
'consume',
new MessageSubscribeSpec({
name: 'amqplib.Channel#consume',
queue: shim.FIRST,
consumer: shim.SECOND,
promise: promiseMode,
callback: promiseMode ? null : shim.FOURTH,
messageHandler: describeMessage
})
)
}

/**
* Extracts the appropriate messageHandler parameters for the consume method.
*
* @param {Shim} shim instance of shim
* @param {Array} args arguments passed to the consume method
* @returns {object} message params
*/
function describeMessage(shim, args) {
const [message] = args

if (!message?.properties) {
shim.logger.debug({ message: message }, 'Failed to find message in consume arguments.')
return null
}

const parameters = getParametersFromMessage(message)
let exchangeName = message?.fields?.exchange || 'Default'

if (TEMP_RE.test(exchangeName)) {
exchangeName = null
}

return new MessageSpec({
destinationName: exchangeName,
destinationType: shim.EXCHANGE,
routingKey: message?.fields?.routingKey,
headers: message.properties.headers,
parameters
})
}
Loading

0 comments on commit 2cb9521

Please sign in to comment.