Skip to content

Commit

Permalink
feat(collector): added support for [email protected] (#691)
Browse files Browse the repository at this point in the history
refs 113049
  • Loading branch information
kirrg001 authored Jan 27, 2023
1 parent f39391a commit f8bf9e7
Show file tree
Hide file tree
Showing 7 changed files with 2,465 additions and 11 deletions.
2,096 changes: 2,096 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
"sinon": "^11.1.1",
"sinon-chai": "^3.7.0",
"sqs-consumer": "5.7.0",
"sqs-consumer-v6": "npm:sqs-consumer@^6.2.0",
"superagent": "^6.1.0",
"tsoa": "^3.14.1",
"typescript": "^4.5.2",
Expand Down
115 changes: 115 additions & 0 deletions packages/collector/test/tracing/cloud/aws-sdk/v3/sqs/sqs-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* (c) Copyright IBM Corp. 2023
*/

'use strict';

const mock = require('mock-require');

if (process.env.AWS_SDK_CLIENT_SQS_REQUIRE !== '@aws-sdk/client-sqs') {
mock('@aws-sdk/client-sqs', process.env.AWS_SDK_CLIENT_SQS_REQUIRE);
}

mock('sqs-consumer', 'sqs-consumer-v6');

const instana = require('../../../../../../src')();
const express = require('express');
const awsSdk3 = require('@aws-sdk/client-sqs');
const { Consumer } = require('sqs-consumer');
const request = require('request-promise');
const { sendToParent } = require('../../../../../../../core/test/test_util');
const delay = require('../../../../../../../core/test/test_util/delay');

const awsRegion = 'us-east-2';
const port = process.env.APP_PORT || 3216;
const agentPort = process.env.INSTANA_AGENT_PORT || 42699;
const withError = process.env.AWS_SQS_RECEIVER_ERROR === 'true';
const handleMessageBatch = process.env.HANDLE_MESSAGE_BATCH === 'true';
const app = express();

const queueURL = process.env.AWS_SQS_QUEUE_URL;
const logPrefix = `AWS SQS Consumer API (${process.pid}):\t`;
let hasStartedPolling = false;

function log() {
/* eslint-disable no-console */
const args = Array.prototype.slice.call(arguments);
args[0] = `${logPrefix}${args[0]}`;
console.log.apply(console, args);
/* eslint-enable no-console */
}

const sqs = new awsSdk3.SQSClient({ region: awsRegion });

const handleMessageFn = async message => {
// make sure the span took at least one second to complete
await delay(1000);
sendToParent(message);
await delay(200);

await request(`http://localhost:${agentPort}?msg=${message.Body}`);
log(`Sent an HTTP request after receiving message of id ${message.MessageId}`);

if (withError) {
throw new Error('Forced error');
}
};

const handleMessageBatchFn = async messages => {
// make sure the span took at least one second to complete
await delay(1000);

messages.forEach(async function (m) {
sendToParent(m);
await request(`http://localhost:${agentPort}?msg=${m.Body}`);
log(`Sent an HTTP request after receiving message of id ${m.MessageId}`);
});

await delay(200);

if (withError) {
throw new Error('Forced error');
}
};
const fn = handleMessageBatch ? { handleMessageBatch: handleMessageBatchFn } : { handleMessage: handleMessageFn };

const consumerApp = Consumer.create(
Object.assign(
{
// MaxNumberOfMessages
batchSize: 10,
waitTimeSeconds: 2,
// We sometimes receives Messages: undefined, which made the tests flaky
// https://github.com/aws/aws-sdk-js-v3/issues/1394
visibilityTimeout: 1,
queueUrl: queueURL,
sqs
},
fn
)
);

app.get('/', (_req, res) => {
if (hasStartedPolling) {
res.send('OK');
} else {
res.status(500).send('Not ready yet.');
}
});

app.listen(port, () => {
log(`App started at port ${port}`);
});

async function startPollingWhenReady() {
// Make sure we are connected to the agent before calling sqs.receiveMessage for the first time.
if (instana.isConnected()) {
consumerApp.start();
hasStartedPolling = true;
} else {
await delay(50);
setImmediate(startPollingWhenReady);
}
}

startPollingWhenReady();
167 changes: 157 additions & 10 deletions packages/collector/test/tracing/cloud/aws-sdk/v3/sqs/test_definition.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const {
expectAtLeastOneMatching,
retry,
delay,
stringifyItems
stringifyItems,
expectExactlyNMatching
} = require('../../../../../../../core/test/test_util');
const ProcessControls = require('../../../../../test_util/ProcessControls');
const globalAgent = require('../../../../../globalAgent');
Expand Down Expand Up @@ -188,10 +189,140 @@ function start(version) {
});
});

/**
* At the moment, SQS-Consumer does not support AWS SDK v3, although a PR exists in their repository:
* https://github.com/bbc/sqs-consumer/pull/252
*/
// See https://github.com/bbc/sqs-consumer/issues/356
if (version !== '@aws-sdk/client-sqs' && semver.gte(process.versions.node, '14.0.0')) {
describe('sqs-consumer API', () => {
describe('[handleMessage] message processed with success', () => {
const sqsConsumerControls = new ProcessControls({
appPath: path.join(__dirname, 'sqs-consumer'),
port: 3216,
useGlobalAgent: true,
env: {
AWS_SQS_QUEUE_URL: `${queueUrlPrefix}${queueName}-consumer`,
AWS_SDK_CLIENT_SQS_REQUIRE: version
}
});

ProcessControls.setUpHooksWithRetryTime(retryTime, sqsConsumerControls);

const apiPath = '/send-message/v3';

it('receives message', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: apiPath
});

await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: false,
isBatch: false
});
});

it('receives messages', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: `${apiPath}?isBatch=true`
});

await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: false,
isBatch: true
});
});
});

describe('[handleMessageBatch] message processed with success', () => {
const sqsConsumerControls = new ProcessControls({
appPath: path.join(__dirname, 'sqs-consumer'),
port: 3216,
useGlobalAgent: true,
env: {
AWS_SQS_QUEUE_URL: `${queueUrlPrefix}${queueName}-consumer`,
AWS_SDK_CLIENT_SQS_REQUIRE: version,
HANDLE_MESSAGE_BATCH: true
}
});

ProcessControls.setUpHooksWithRetryTime(retryTime, sqsConsumerControls);

const apiPath = '/send-message/v3';

it('receives message', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: apiPath
});
await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: false,
isBatch: false
});
});

it('receives messages', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: `${apiPath}?isBatch=true`
});

await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: false,
isBatch: true,
isSQSConsumer: true
});
});
});

describe('message not processed with success', () => {
const sqsConsumerControls = new ProcessControls({
appPath: path.join(__dirname, 'sqs-consumer'),
port: 3216,
useGlobalAgent: true,
env: {
AWS_SQS_QUEUE_URL: `${queueUrlPrefix}${queueName}-consumer`,
AWS_SDK_CLIENT_SQS_REQUIRE: version,
AWS_SQS_RECEIVER_ERROR: 'true'
}
});

ProcessControls.setUpHooksWithRetryTime(retryTime, sqsConsumerControls);

const apiPath = '/send-message/v3';

it('fails to receive a message', async () => {
const response = await senderControlsSQSConsumer.sendRequest({
method: 'GET',
path: apiPath
});

await verify({
receiverControls: sqsConsumerControls,
senderControls: senderControlsSQSConsumer,
response,
apiPath,
withError: 'receiver',
isBatch: false
});
});
});
});
}

describe('messages sent in batch', () => {
receivingMethods.forEach(sqsReceiveMethod => {
Expand Down Expand Up @@ -413,18 +544,18 @@ function start(version) {
});
});

async function verify({ receiverControls, senderControls, response, apiPath, withError, isBatch }) {
async function verify({ receiverControls, senderControls, response, apiPath, withError, isBatch, isSQSConsumer }) {
if (withError === 'sender') {
expect(response.error).to.equal('MissingParameter: The request must contain the parameter MessageBody.');
} else {
await retry(async () => {
if (isBatch) {
verifyResponseAndBatchMessage(response, receiverControls);
verifyResponseAndBatchMessage(response, receiverControls, isSQSConsumer);
} else {
verifyResponseAndMessage(response, receiverControls);
}
const spans = await agentControls.getSpans();
verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch });
verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch, isSQSConsumer });
}, retryTime);
}
}
Expand Down Expand Up @@ -465,14 +596,30 @@ function start(version) {
}, retryTime);
}

function verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch }) {
function verifySpans({ receiverControls, senderControls, spans, apiPath, withError, isBatch, isSQSConsumer }) {
const httpEntry = verifyHttpRootEntry({ spans, apiPath, pid: String(senderControls.getPid()) });
const sqsExit = verifySQSExit({ senderControls, spans, parent: httpEntry, withError });
verifyHttpExit({ spans, parent: httpEntry, pid: String(senderControls.getPid()) });

if (withError !== 'publisher') {
const sqsEntry = verifySQSEntry({ receiverControls, spans, parent: sqsExit, withError, isBatch });
verifyHttpExit({ spans, parent: sqsEntry, pid: String(receiverControls.getPid()) });

if (isSQSConsumer) {
verifyHttpExit({
spans,
parent: sqsEntry,
pid: String(receiverControls.getPid()),
testMethod: (exitSpans, tests) => {
return expectExactlyNMatching(exitSpans, 4, tests);
}
});
} else {
verifyHttpExit({
spans,
parent: sqsEntry,
pid: String(receiverControls.getPid())
});
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const awsProducts = [
require('./sqs')
];

const sqsConsumer = require('./sqs-consumer');

/** @type {Object.<string, import('./instana_aws_product').InstanaAWSProduct} */
const operationMap = {};

Expand All @@ -28,6 +30,8 @@ let isActive = false;
let onFileLoaded = false;

exports.init = function init() {
sqsConsumer.init();

/**
* @aws-sdk/smithly-client >= 3.36.0 changed how the dist structure gets delivered
* https://github.com/aws/aws-sdk-js-v3/blob/main/packages/smithy-client/CHANGELOG.md#3360-2021-10-08
Expand Down
Loading

0 comments on commit f8bf9e7

Please sign in to comment.