Skip to content

Commit

Permalink
feat: Added instrumentation for kafkajs.Kafka.consumer (#2244)
Browse files Browse the repository at this point in the history
  • Loading branch information
bizob2828 authored Jun 4, 2024
1 parent 4079a0a commit 8b1fa5d
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 56 deletions.
1 change: 0 additions & 1 deletion lib/instrumentation/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@
*/

'use strict'

module.exports = require('./kafkajs/index')
101 changes: 101 additions & 0 deletions lib/instrumentation/kafkajs/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'
const { kafkaCtx } = require('../../symbols')
const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs')
const { DESTINATIONS } = require('../../config/attribute-filter')
const CONSUMER_METHODS = [
'connect',
'disconnect',
'subscribe',
'stop',
'commitOffsets',
'seek',
'pause',
'resume'
]
const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#'

module.exports = function instrumentConsumer({ shim, kafkajs }) {
shim.wrap(kafkajs.Kafka.prototype, 'consumer', function wrapConsumer(shim, orig) {
return function wrappedConsumer() {
const args = shim.argsToArray.apply(shim, arguments)
const consumer = orig.apply(this, args)
consumer.on(consumer.events.REQUEST, function listener(data) {
// storing broker for when we add `host`, `port` to messaging spans
consumer[kafkaCtx] = {
clientId: data?.payload?.clientId,
broker: data?.payload.broker
}
})
shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) {
return new RecorderSpec({
name: `${SEGMENT_PREFIX}${name}`,
promise: true
})
})
shim.recordSubscribedConsume(
consumer,
'run',
new MessageSubscribeSpec({
name: `${SEGMENT_PREFIX}#run`,
destinationType: shim.TOPIC,
promise: true,
consumer: shim.FIRST,
functions: ['eachMessage'],
messageHandler: handler.bind(null, consumer)
})
)
return consumer
}
})
}

/**
* Message handler that extracts the topic and headers from message being consumed.
*
* This also sets some metrics for byte length of message, and number of messages.
* Lastly, adds tx attributes for byteCount and clientId
*
* @param {object} consumer the instance of kafka consumer
* @param {MessageShim} shim instance of shim
* @param {Array} args arguments passed to the `eachMessage` function of the `consumer.run`
* @returns {MessageSpec} spec for message handling
*/
function handler(consumer, shim, args) {
const [data] = args
const { topic } = data
const segment = shim.getActiveSegment()

if (segment?.transaction) {
const tx = segment.transaction
const byteLength = data?.message.value?.byteLength
const metricPrefix = `Message/Kafka/Topic/Named/${topic}/Received`
// This will always be 1
tx.metrics.getOrCreateMetric(`${metricPrefix}/Messages`).recordValue(1)
if (byteLength) {
tx.metrics.measureBytes(`${metricPrefix}/Bytes`, byteLength)
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_SCOPE,
'kafka.consume.byteCount',
byteLength
)
}
if (consumer?.[kafkaCtx]) {
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_EVENT,
'kafka.consume.client_id',
consumer[kafkaCtx].clientId
)
}
}

return new MessageSpec({
destinationType: `Topic/Consume`,
destinationName: data?.topic,
headers: data?.message?.headers
})
}
4 changes: 2 additions & 2 deletions lib/instrumentation/kafkajs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
'use strict'

const instrumentProducer = require('./producer')
const instrumentConsumer = require('./consumer')

// eslint-disable-next-line no-unused-vars
module.exports = function initialize(agent, kafkajs, _moduleName, shim) {
if (agent.config.feature_flag.kafkajs_instrumentation === false) {
shim.logger.debug(
Expand All @@ -17,6 +17,6 @@ module.exports = function initialize(agent, kafkajs, _moduleName, shim) {
}

shim.setLibrary(shim.KAFKA)

instrumentConsumer({ shim, kafkajs })
instrumentProducer({ shim, kafkajs })
}
2 changes: 2 additions & 0 deletions lib/instrumentation/kafkajs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ module.exports = function instrumentProducer({ shim, kafkajs }) {
}

return new MessageSpec({
promise: true,
destinationName: data.topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
Expand All @@ -45,6 +46,7 @@ module.exports = function instrumentProducer({ shim, kafkajs }) {
}

return new MessageSpec({
promise: true,
destinationName: data.topicMessages[0].topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
Expand Down
11 changes: 10 additions & 1 deletion lib/shim/message-shim/subscribe-consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,22 @@ function createSubscriberWrapper({ shim, fn, spec, destNameIsArg }) {
}
}

if (consumerIdx !== null) {
if (consumerIdx !== null && !spec.functions) {
args[consumerIdx] = shim.wrap(
args[consumerIdx],
makeWrapConsumer({ spec, queue, destinationName, destNameIsArg })
)
}

if (consumerIdx !== null && spec.functions) {
spec.functions.forEach((fn) => {
args[consumerIdx][fn] = shim.wrap(
args[consumerIdx][fn],
makeWrapConsumer({ spec, queue, destinationName, destNameIsArg })
)
})
}

return fn.apply(this, args)
}
}
Expand Down
25 changes: 25 additions & 0 deletions lib/shim/specs/message-subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@ class MessageSubscribeSpec extends MessageSpec {
*/
consumer

/**
* Indicates names of functions to be wrapped for message consumption.
* This must be used in tandem with consumer.
* @type {Array<string>|null}
* @example
* // Wrap the eachMessage method on a consumer
* class Consumer() {
* constructor() {}
* async run(consumer) {
* consumer.eachMessage({ message })
* }
* }
*
* const spec = new MessageSubscribeSpec({
* name: 'Consumer#run'
* promise: true
* consumer: shim.FIRST,
* functions: ['eachMessage']
* })
*
* shim.recordSubscribedConsume(Consumer.prototype, 'run', spec)
*/
functions

/* eslint-disable jsdoc/require-param-description */
/**
* @param {MessageSubscribeSpecParams} params
Expand All @@ -35,6 +59,7 @@ class MessageSubscribeSpec extends MessageSpec {
super(params)

this.consumer = params.consumer ?? null
this.functions = Array.isArray(params.functions) ? params.functions : null
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module.exports = {
databaseName: Symbol('databaseName'),
disableDT: Symbol('Disable distributed tracing'), // description for backwards compatibility
executorContext: Symbol('executorContext'),
kafkaCtx: Symbol('kafkaCtx'),
koaBody: Symbol('body'),
koaBodySet: Symbol('bodySet'),
koaRouter: Symbol('koaRouter'),
Expand Down
9 changes: 8 additions & 1 deletion lib/transaction/tracecontext.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ class TraceContext {
return traceParentInfo
}

if (Buffer.isBuffer(traceparent)) {
traceparent = traceparent.toString()
}
const trimmed = traceparent.trim()
const parts = trimmed.split('-')

Expand Down Expand Up @@ -445,10 +448,14 @@ class TraceContext {
}

_parseTraceState(params) {
const { tracestate, hasTrustKey, expectedNrKey } = params
const { hasTrustKey, expectedNrKey } = params
let { tracestate } = params
let nrTraceStateValue = null
const finalListMembers = []
const vendors = []
if (Buffer.isBuffer(tracestate)) {
tracestate = tracestate.toString()
}
const incomingListMembers = tracestate.split(',')
for (let i = 0; i < incomingListMembers.length; i++) {
const listMember = incomingListMembers[i].trim()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
"scripts": {
"bench": "node ./bin/run-bench.js",
"docker-env": "./bin/docker-env-vars.sh",
"docs": "npm ci && jsdoc -c ./jsdoc-conf.json --private -r .",
"docs": "rm -rf ./out && jsdoc -c ./jsdoc-conf.jsonc --private -r .",
"integration": "npm run prepare-test && npm run sub-install && time c8 -o ./coverage/integration tap --test-regex='(\\/|^test\\/integration\\/.*\\.tap\\.js)$' --timeout=600 --no-coverage --reporter classic",
"integration:esm": "time c8 -o ./coverage/integration-esm tap --node-arg='--loader=./esm-loader.mjs' --test-regex='(test\\/integration\\/.*\\.tap\\.mjs)$' --timeout=600 --no-coverage --reporter classic",
"prepare-test": "npm run ssl && npm run docker-env",
Expand Down
31 changes: 31 additions & 0 deletions test/unit/distributed_tracing/tracecontext.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ tap.test('TraceContext', function (t) {
t.equal(traceContext._validateAndParseTraceParentHeader(shorterStr).entryValid, false)
t.end()
})

t.test('should handle if traceparent is a buffer', (t) => {
const { traceContext } = t.context
const traceparent = '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00'
const bufferTraceParent = Buffer.from(traceparent, 'utf8')
t.ok(traceContext._validateAndParseTraceParentHeader(bufferTraceParent).entryValid)
t.end()
})
})

t.test('_validateAndParseTraceStateHeader', (t) => {
Expand Down Expand Up @@ -283,6 +291,29 @@ tap.test('TraceContext', function (t) {
t.end()
})

t.test('should pass a valid tracestate header if a buffer', (t) => {
const { agent, traceContext } = t.context
agent.config.trusted_account_key = '190'
const goodTraceStateHeader =
/* eslint-disable-next-line max-len */
'190@nr=0-0-709288-8599547-f85f42fd82a4cf1d-164d3b4b0d09cb05-1-0.789-1563574856827,234234@foo=bar'
const bufferTraceState = Buffer.from(goodTraceStateHeader, 'utf8')
const valid = traceContext._validateAndParseTraceStateHeader(bufferTraceState)
t.ok(valid)
t.equal(valid.entryFound, true)
t.equal(valid.entryValid, true)
t.equal(valid.intrinsics.version, 0)
t.equal(valid.intrinsics.parentType, 'App')
t.equal(valid.intrinsics.accountId, '709288')
t.equal(valid.intrinsics.appId, '8599547')
t.equal(valid.intrinsics.spanId, 'f85f42fd82a4cf1d')
t.equal(valid.intrinsics.transactionId, '164d3b4b0d09cb05')
t.equal(valid.intrinsics.sampled, true)
t.equal(valid.intrinsics.priority, 0.789)
t.equal(valid.intrinsics.timestamp, 1563574856827)
t.end()
})

t.test('should fail mismatched trusted account ID in tracestate header', (t) => {
const { agent, traceContext } = t.context
agent.config.trusted_account_key = '666'
Expand Down
28 changes: 28 additions & 0 deletions test/unit/shim/message-shim.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1170,5 +1170,33 @@ tap.test('MessageShim', function (t) {
t.ok(parent)
})
})

t.test('should wrap object key of consumer', function (t) {
t.plan(3)
const message = { foo: 'bar' }
const subscriber = function subscriber(consumer) {
consumer.eachMessage(message)
}
const wrapped = shim.recordSubscribedConsume(subscriber, {
name: 'Channel#subscribe',
consumer: shim.FIRST,
functions: ['eachMessage'],
messageHandler: function (shim, args) {
t.same(args[0], message)
return {
destinationName: 'exchange.foo',
destinationType: shim.EXCHANGE
}
}
})
wrapped({
eachMessage: function consumer(msg) {
const segment = shim.getSegment()
t.equal(segment.name, 'OtherTransaction/Message/RabbitMQ/Exchange/Named/exchange.foo')
t.equal(msg, message)
t.end()
}
})
})
})
})
14 changes: 14 additions & 0 deletions test/unit/shim/shim.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3149,4 +3149,18 @@ tap.test('Shim', function (t) {
t.ok(shim.specs.params.QueueMessageParameters)
t.end()
})

t.test('should not use functions in MessageSubscribeSpec if it is not an array', (t) => {
const agent = helper.loadMockedAgent()
t.teardown(() => {
helper.unloadAgent(agent)
})

const shim = new Shim(agent, 'test-mod')
const spec = new shim.specs.MessageSubscribeSpec({
functions: 'foo-bar'
})
t.notOk(spec.functions)
t.end()
})
})
Loading

0 comments on commit 8b1fa5d

Please sign in to comment.