Skip to content

Commit

Permalink
chore: extended tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kirrg001 committed Jan 27, 2023
1 parent 8686f05 commit c86c84f
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const awsRegion = 'us-east-2';
const port = process.env.APP_PORT || 3216;
const agentPort = process.env.INSTANA_AGENT_PORT || 42699;
const withError = process.env.AWS_SQS_RECEIVER_ERROR === 'true';
const handleMessageBatch = process.env.HANDLE_MESSAGE_BATCH === 'true';
const app = express();

const queueURL = process.env.AWS_SQS_QUEUE_URL;
Expand All @@ -40,23 +41,53 @@ function log() {

const sqs = new awsSdk3.SQSClient({ region: awsRegion });

const consumerApp = Consumer.create({
queueUrl: queueURL,
sqs,
handleMessage: async message => {
// make sure the span took at least one second to complete
await delay(1000);
sendToParent(message);
await delay(200);

await request(`http://localhost:${agentPort}?msg=${message.Body}`);
log(`Sent an HTTP request after receiving message of id ${message.MessageId}`);

if (withError) {
throw new Error('Forced error');
}
const handleMessageFn = async message => {
// make sure the span took at least one second to complete
await delay(1000);
sendToParent(message);
await delay(200);

await request(`http://localhost:${agentPort}?msg=${message.Body}`);
log(`Sent an HTTP request after receiving message of id ${message.MessageId}`);

if (withError) {
throw new Error('Forced error');
}
});
};

const handleMessageBatchFn = async messages => {
// make sure the span took at least one second to complete
await delay(1000);

messages.forEach(async function (m) {
sendToParent(m);
await request(`http://localhost:${agentPort}?msg=${m.Body}`);
log(`Sent an HTTP request after receiving message of id ${m.MessageId}`);
});

await delay(200);

if (withError) {
throw new Error('Forced error');
}
};
const fn = handleMessageBatch ? { handleMessageBatch: handleMessageBatchFn } : { handleMessage: handleMessageFn };

const consumerApp = Consumer.create(
Object.assign(
{
// MaxNumberOfMessages
batchSize: 10,
waitTimeSeconds: 2,
// We sometimes receives Messages: undefined, which made the tests flaky
// https://github.com/aws/aws-sdk-js-v3/issues/1394
visibilityTimeout: 1,
queueUrl: queueURL,
sqs
},
fn
)
);

app.get('/', (_req, res) => {
if (hasStartedPolling) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const {
expectAtLeastOneMatching,
retry,
delay,
stringifyItems
stringifyItems,
expectExactlyNMatching
} = require('../../../../../../../core/test/test_util');
const ProcessControls = require('../../../../../test_util/ProcessControls');
const globalAgent = require('../../../../../globalAgent');
Expand Down Expand Up @@ -191,7 +192,7 @@ function start(version) {
// See https://github.com/bbc/sqs-consumer/issues/356
if (version !== '@aws-sdk/client-sqs' && semver.gte(process.versions.node, '14.0.0')) {
describe('sqs-consumer API', () => {
describe('message processed with success', () => {
describe('[handleMessage] message processed with success', () => {
const sqsConsumerControls = new ProcessControls({
appPath: path.join(__dirname, 'sqs-consumer'),
port: 3216,
Expand Down Expand Up @@ -221,6 +222,71 @@ function start(version) {
isBatch: false
});
});

it('receives messages', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: `${apiPath}?isBatch=true`
});

await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: false,
isBatch: true
});
});
});

describe('[handleMessageBatch] message processed with success', () => {
const sqsConsumerControls = new ProcessControls({
appPath: path.join(__dirname, 'sqs-consumer'),
port: 3216,
useGlobalAgent: true,
env: {
AWS_SQS_QUEUE_URL: `${queueUrlPrefix}${queueName}-consumer`,
AWS_SDK_CLIENT_SQS_REQUIRE: version,
HANDLE_MESSAGE_BATCH: true
}
});

ProcessControls.setUpHooksWithRetryTime(retryTime, sqsConsumerControls);

const apiPath = '/send-message/v3';

it('receives message', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: apiPath
});
await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: false,
isBatch: false
});
});

it('receives messages', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: `${apiPath}?isBatch=true`
});

await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: false,
isBatch: true,
isSQSConsumer: true
});
});
});

describe('message not processed with success', () => {
Expand Down Expand Up @@ -478,18 +544,18 @@ function start(version) {
});
});

async function verify({ receiverControls, senderControls, response, apiPath, withError, isBatch }) {
async function verify({ receiverControls, senderControls, response, apiPath, withError, isBatch, isSQSConsumer }) {
if (withError === 'sender') {
expect(response.error).to.equal('MissingParameter: The request must contain the parameter MessageBody.');
} else {
await retry(async () => {
if (isBatch) {
verifyResponseAndBatchMessage(response, receiverControls);
verifyResponseAndBatchMessage(response, receiverControls, isSQSConsumer);
} else {
verifyResponseAndMessage(response, receiverControls);
}
const spans = await agentControls.getSpans();
verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch });
verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch, isSQSConsumer });
}, retryTime);
}
}
Expand Down Expand Up @@ -530,14 +596,30 @@ function start(version) {
}, retryTime);
}

function verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch }) {
function verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch, isSQSConsumer }) {
const httpEntry = verifyHttpRootEntry({ spans, apiPath, pid: String(senderControls.getPid()) });
const sqsExit = verifySQSExit({ senderControls, spans, parent: httpEntry, withError });
verifyHttpExit({ spans, parent: httpEntry, pid: String(senderControls.getPid()) });

if (withError !== 'publisher') {
const sqsEntry = verifySQSEntry({ receiverControls, spans, parent: sqsExit, withError, isBatch });
verifyHttpExit({ spans, parent: sqsEntry, pid: String(receiverControls.getPid()) });

if (isSQSConsumer) {
verifyHttpExit({
spans,
parent: sqsEntry,
pid: String(receiverControls.getPid()),
testMethod: (exitSpans, tests) => {
return expectExactlyNMatching(exitSpans, 4, tests);
}
});
} else {
verifyHttpExit({
spans,
parent: sqsEntry,
pid: String(receiverControls.getPid())
});
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,37 @@ function instrument(SQSConsumer) {
return orig.apply(this, arguments);
};
});

shimmer.wrap(SQSConsumer.Consumer.prototype, 'executeBatchHandler', function (orig) {
return function instanaExecuteBatchHandler() {
const messages = arguments[0];

if (messages) {
const message = messages.find(msg => msg.instanaAsyncContext);

return cls.runInAsyncContext(message.instanaAsyncContext, () => {
const span = cls.getCurrentSpan();
span.disableAutoEnd();
const res = orig.apply(this, arguments);

res
.then(() => {
span.d = Date.now() - span.ts;
span.transmitManual();
})
.catch(err => {
span.ec = 1;
span.data.sqs.error = err.message || err.code || JSON.stringify(err);
span.d = Date.now() - span.ts;
span.transmitManual();
});

return res;
});
}
return orig.apply(this, arguments);
};
});
}

module.exports = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,11 @@ class InstanaAWSSQS extends InstanaAWSProduct {
span.cancel();
}

// NOTE: attach the async context to continue with it if sqs-consumer is used, see index.js
// sqs-consumer messages are dependend on the customer's `handleMessage` function
// NOTE: attach the async context to the last message to be able to
// finish the span with the correct end time and error in the sqs-consuemr `handleMessage` function.
// 1x ReceiveMessageCommand with multiple messages (batchSize>1) == 1 sqs entry with size 4
if (data && data.Messages && data.Messages) {
data.Messages.forEach(message => {
message.instanaAsyncContext = cls.getAsyncContext();
});
data.Messages[data.Messages.length - 1].instanaAsyncContext = cls.getAsyncContext();
}

return data;
Expand Down

0 comments on commit c86c84f

Please sign in to comment.