Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(amqp): publish span not transmitted when confirm cb is missing #745

Merged
merged 4 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/collector/test/tracing/messaging/amqp/amqpUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
'use strict';

exports.exchange = 'instana-test-exchange';
exports.exchangeConfirm = 'instana-test-exchange-comfirm';
exports.queueName = 'instana-test-queue';
exports.queueNameGet = 'instana-test-queue-get';
exports.queueNameConfirm = 'instana-test-queue-confirm';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ exports.sendToGetQueue = (message, headers) =>
}
});

exports.publishToConfirmChannelWithoutCallback = (message, headers) =>
request({
method: 'POST',
url: `http://127.0.0.1:${appPort}/publish-to-confirm-channel-without-callback`,
json: true,
simple: true,
headers,
body: {
message
}
});

exports.sendToConfirmQueue = (message, headers) =>
request({
method: 'POST',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require('../../../..')({

const amqp = require('amqplib');
const exchange = require('./amqpUtil').exchange;
const exchangeConfirm = require('./amqpUtil').exchangeConfirm;
const queueName = require('./amqpUtil').queueName;
const queueNameGet = require('./amqpUtil').queueNameGet;
const queueNameConfirm = require('./amqpUtil').queueNameConfirm;
Expand Down Expand Up @@ -57,6 +58,9 @@ amqp
.then(() => connection.createConfirmChannel())
.then(_confirmChannel => {
confirmChannel = _confirmChannel;
return confirmChannel.assertExchange(exchangeConfirm, 'fanout', { durable: false });
})
.then(() => {
return confirmChannel.assertQueue(queueNameConfirm, { durable: false });
})
.then(() => {
Expand Down Expand Up @@ -120,6 +124,24 @@ app.post('/send-to-get-queue', (req, res) => {
});
});

app.post('/publish-to-confirm-channel-without-callback', (req, res) => {
// @golevelup/nestjs-rabbitmq 3.3.0 did not pass the confirm callback
// eslint-disable-next-line max-len
// https://github.com/golevelup/nestjs/blob/%40golevelup/nestjs-rabbitmq%403.3.0/packages/rabbitmq/src/amqp/connection.ts#L541
// https://github.com/jwalton/node-amqp-connection-manager/blob/v4.1.11/src/ChannelWrapper.ts#L13
// https://github.com/amqp-node/amqplib/blob/v0.10.3/lib/channel_model.js#L265
confirmChannel.publish(exchangeConfirm, '', Buffer.from(req.body.message));

request(`http://127.0.0.1:${agentPort}`)
.then(() => {
res.status(201).send('OK');
})
.catch(err2 => {
log(err2);
res.sendStatus(500);
});
});

app.post('/send-to-confirm-queue', (req, res) => {
// Even with a ConfirmChannel and even in the promise API case, sendToQueue/publish do not return a promise, but only
// accept a callback, see
Expand Down
16 changes: 16 additions & 0 deletions packages/collector/test/tracing/messaging/amqp/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ const mochaSuiteFn = supportedVersion(process.versions.node) ? describe : descri
)
));

if (apiType === 'Promises') {
it('must record an exit span for ConfirmChannel#publish without confirm callback', () =>
publisherControls.publishToConfirmChannelWithoutCallback('Ohai!').then(() =>
retry(() =>
agentControls.getSpans().then(spans => {
const httpEntry = verifyHttpEntry(spans);
const rabbitMqExit = verifyRabbitMqExit(spans, httpEntry);
expect(rabbitMqExit.data.rabbitmq.exchange).to.eql('instana-test-exchange-comfirm');
expect(rabbitMqExit.data.rabbitmq.key).to.not.exist;
verifyHttpExit(spans, httpEntry);
})
)
));
}

// `sendToQueue` calls .publish internally
it('must record an exit span for ConfirmChannel#sendToQueue', () =>
publisherControls.sendToConfirmQueue('Ohai!').then(() =>
retry(() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ function instrumentedSendMessage(ctx, originalSendMessage, originalArgs) {
propagateSuppression(originalArgs[0]);
propagateSuppression(originalArgs[1]);
}

return originalSendMessage.apply(ctx, originalArgs);
}

Expand All @@ -71,6 +70,7 @@ function instrumentedSendMessage(ctx, originalSendMessage, originalArgs) {
return originalSendMessage.apply(ctx, originalArgs);
}

// instrumentedChannelModelPublish starts the span without data. The fn is responsible to transmit the span.
if (isExitSpan && parentSpan.n === 'rabbitmq') {
// if ConfirmChannel#publish/sendToQueue has been invoked, we have already created a new cls context in
// instrumentedChannelModelPublish and must not do so again here.
Expand Down Expand Up @@ -412,6 +412,7 @@ function instrumentedChannelModelPublish(ctx, originalFunction, originalArgs) {

return cls.ns.runAndReturn(() => {
const span = cls.startSpan('rabbitmq', constants.EXIT);

// everything else is handled in instrumentedSendMessage/processExitSpan
if (originalArgs.length >= 5 && typeof originalArgs[4] === 'function') {
const originalCb = originalArgs[4];
Expand All @@ -420,7 +421,12 @@ function instrumentedChannelModelPublish(ctx, originalFunction, originalArgs) {
span.transmit();
originalCb.apply(this, arguments);
});
} else {
// CASE: confirm callback missing. amqplib does not throw any error, just transmit the span
span.d = Date.now() - span.ts;
span.transmit();
kirrg001 marked this conversation as resolved.
Show resolved Hide resolved
}

return originalFunction.apply(ctx, originalArgs);
});
}
Expand Down