Skip to content

Commit

Permalink
fix(grpc-js): capture the correct destination host per grpc-js client
Browse files Browse the repository at this point in the history
This fixes an issue where we would record the wrong destination
host/port when multiple @grpc/grpc-js clients are used by the
application.
  • Loading branch information
Bastian Krol authored and basti1302 committed Mar 3, 2023
1 parent f94646c commit 5bc3188
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 44 deletions.
37 changes: 28 additions & 9 deletions packages/collector/test/tracing/protocols/grpc-js/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const log = require('@instana/core/test/test_util/log').getLogger(logPrefix);
const PROTO_PATH = path.join(__dirname, 'protos/test.proto');

let client;
let anotherClient;
let makeUnaryCall;
let startServerSideStreaming;
let startClientSideStreaming;
Expand All @@ -41,6 +42,7 @@ function runClient() {

const testProto = grpc.loadPackageDefinition(packageDefinition).instana.node.grpc.test;
client = new testProto.TestService('localhost:50051', grpc.credentials.createInsecure());
anotherClient = new testProto.TestService('127.0.0.1:50051', grpc.credentials.createInsecure());

makeUnaryCall = unaryCall;
startServerSideStreaming = serverSideStreaming;
Expand All @@ -50,9 +52,9 @@ function runClient() {

runClient();

function unaryCall(cancel, triggerError, cb) {
function unaryCall({ cancel, triggerError, grpcJsClient = client }, cb) {
const parameter = triggerError ? 'error' : 'request';
const call = client.makeUnaryCall({ parameter }, cb);
const call = grpcJsClient.makeUnaryCall({ parameter }, cb);

if (cancel) {
setTimeout(() => {
Expand All @@ -61,7 +63,7 @@ function unaryCall(cancel, triggerError, cb) {
}
}

function serverSideStreaming(cancel, triggerError, cb) {
function serverSideStreaming({ cancel, triggerError }, cb) {
const replies = [];
const call = client.startServerSideStreaming({
parameter: paramFor(cancel, triggerError)
Expand Down Expand Up @@ -91,7 +93,7 @@ function serverSideStreaming(cancel, triggerError, cb) {
});
}

function clientSideStreaming(cancel, triggerError, cb) {
function clientSideStreaming({ cancel, triggerError }, cb) {
const call = client.startClientSideStreaming(cb);

if (triggerError) {
Expand All @@ -118,7 +120,7 @@ function clientSideStreaming(cancel, triggerError, cb) {
}
}

function bidiStreaming(cancel, triggerError, cb) {
function bidiStreaming({ cancel, triggerError }, cb) {
const replies = [];
const call = client.startBidiStreaming();
let cbCalled = false;
Expand Down Expand Up @@ -176,7 +178,7 @@ app.get('/', (req, res) => {
});

app.post('/unary-call', (req, res) => {
makeUnaryCall(req.query.cancel, req.query.error, (err, reply) => {
makeUnaryCall(optionsFromRequest(req), (err, reply) => {
if (err) {
pinoLogger.error(err);
return res.send(err);
Expand All @@ -188,7 +190,7 @@ app.post('/unary-call', (req, res) => {
});

app.post('/server-stream', (req, res) => {
startServerSideStreaming(req.query.cancel, req.query.error, (err, replyMessages) => {
startServerSideStreaming(optionsFromRequest(req), (err, replyMessages) => {
if (err) {
pinoLogger.error(err);
return res.send(err);
Expand All @@ -200,7 +202,7 @@ app.post('/server-stream', (req, res) => {
});

app.post('/client-stream', (req, res) => {
startClientSideStreaming(req.query.cancel, req.query.error, (err, reply) => {
startClientSideStreaming(optionsFromRequest(req), (err, reply) => {
if (err) {
pinoLogger.error(err);
return res.send(err);
Expand All @@ -212,7 +214,7 @@ app.post('/client-stream', (req, res) => {
});

app.post('/bidi-stream', (req, res) => {
startBidiStreaming(req.query.cancel, req.query.error, (err, replyMessages) => {
startBidiStreaming(optionsFromRequest(req), (err, replyMessages) => {
if (err) {
pinoLogger.error(err);
return res.send(err);
Expand All @@ -222,6 +224,23 @@ app.post('/bidi-stream', (req, res) => {
});
});

function optionsFromRequest(req) {
return { cancel: req.query.cancel, triggerError: req.query.error };
}

app.post('/two-different-hosts', (req, res) => {
makeUnaryCall({ grpcJsClient: client }, (err1, reply1) => {
const message1 = typeof reply1.getMessage === 'function' ? reply1.getMessage() : reply1.message;
makeUnaryCall({ grpcJsClient: anotherClient }, (err2, reply2) => {
const message2 = typeof reply2.getMessage === 'function' ? reply2.getMessage() : reply2.message;
return res.send({
reply1: message1,
reply2: message2
});
});
});
});

app.post('/shutdown', (req, res) => {
client.close();
return res.send('Good bye :)');
Expand Down
70 changes: 51 additions & 19 deletions packages/collector/test/tracing/protocols/grpc-js/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,37 @@ mochaSuiteFn('tracing/grpc-js', function () {
expect(response.reply).to.equal('received: request');
return retry(() =>
agentControls.getSpans().then(spans => {
expectExactlyOneMatching(spans, checkHttpEntry('/unary-call'));
expectExactlyOneMatching(spans, checkHttpEntry({ url: '/unary-call' }));
expect(getSpansByName(spans, 'rpc-client')).to.be.empty;
expect(getSpansByName(spans, 'rpc-server')).to.be.empty;
})
);
}));
});

describe('multiple hosts', () => {
const { clientControls } = createProcesses();

it('call two different hosts', async () => {
const url = '/two-different-hosts';
const response = await clientControls.sendRequest({
method: 'POST',
path: url
});

expect(response.reply1).to.equal('received: request');
expect(response.reply2).to.equal('received: request');

let spans;
await retry(async () => {
spans = await agentControls.getSpans();
expect(spans.length).to.eql(7);
});
const httpEntry = expectExactlyOneMatching(spans, checkHttpEntry({ url }));
expectExactlyOneMatching(spans, checkGrpcClientSpan({ httpEntry, clientControls, url, host: 'localhost' }));
expectExactlyOneMatching(spans, checkGrpcClientSpan({ httpEntry, clientControls, url, host: '127.0.0.1' }));
});
});
});

function createProcesses(env = {}, opts = { isMali: false }) {
Expand Down Expand Up @@ -169,20 +193,19 @@ function createQueryParams(cancel, erroneous) {
}
}

function waitForTrace(serverControls, clientControls, url, cancel, erroneous) {
return retry(() =>
agentControls.getSpans().then(spans => {
expect(spans.length).to.eql(5);
checkTrace(serverControls, clientControls, spans, url, cancel, erroneous);
})
);
async function waitForTrace(serverControls, clientControls, url, cancel, erroneous) {
return retry(async () => {
const spans = await agentControls.getSpans();
expect(spans.length).to.eql(5);
checkTrace(serverControls, clientControls, spans, url, cancel, erroneous);
});
}

function checkTrace(serverControls, clientControls, spans, url, cancel, erroneous) {
const httpEntry = expectExactlyOneMatching(spans, checkHttpEntry(url));
const httpEntry = expectExactlyOneMatching(spans, checkHttpEntry({ url }));
const grpcExit = expectExactlyOneMatching(
spans,
checkGrpcClientSpan(httpEntry, clientControls, url, cancel, erroneous)
checkGrpcClientSpan({ httpEntry, clientControls, url, cancel, erroneous })
);

// Except for server-streaming and bidi-streaming, we cancel the call immediately on the client, so it usually never
Expand All @@ -193,27 +216,34 @@ function checkTrace(serverControls, clientControls, spans, url, cancel, erroneou
if (!cancel || url === '/server-stream' || url === '/bidi-stream') {
const grpcEntry = expectExactlyOneMatching(
spans,
checkGrpcServerSpan(grpcExit, serverControls, url, cancel, erroneous)
checkGrpcServerSpan({ grpcExit, serverControls, url, cancel, erroneous })
);

expectExactlyOneMatching(spans, checkLogSpanDuringGrpcEntry(grpcEntry, url, erroneous));
expectExactlyOneMatching(spans, checkLogSpanDuringGrpcEntry({ grpcEntry, url, erroneous }));
} else {
// Would be nice to also check for the log span from the interceptor but will actually never be created because at
// that time, the parent span is an exit span (the GRPC exit). If only log spans were intermediate spans :-)
// expectExactlyOneMatching(spans, checkLogSpanFromClientInterceptor.bind(null, httpEntry));
expectExactlyOneMatching(spans, checkLogSpanAfterGrpcExit(httpEntry, url, cancel, erroneous));
expectExactlyOneMatching(spans, checkLogSpanAfterGrpcExit({ httpEntry, url, cancel, erroneous }));
}
}

function checkHttpEntry(url) {
function checkHttpEntry({ url }) {
return [
span => expect(span.n).to.equal('node.http.server'),
span => expect(span.k).to.equal(constants.ENTRY),
span => expect(span.data.http.url).to.equal(url)
];
}

function checkGrpcClientSpan(httpEntry, clientControls, url, cancel, erroneous) {
function checkGrpcClientSpan({
httpEntry,
clientControls,
url,
cancel = false,
erroneous = false,
host = 'localhost'
}) {
let expectations = [
span => expect(span.n).to.equal('rpc-client'),
span => expect(span.k).to.equal(constants.EXIT),
Expand All @@ -224,7 +254,7 @@ function checkGrpcClientSpan(httpEntry, clientControls, url, cancel, erroneous)
span => expect(span.data.rpc).to.exist,
span => expect(span.data.rpc.flavor).to.equal('grpc'),
span => expect(span.data.rpc.call).to.equal(rpcCallNameForUrl(url)),
span => expect(span.data.rpc.host).to.equal('localhost'),
span => expect(span.data.rpc.host).to.equal(host),
span => expect(span.data.rpc.port).to.equal('50051')
];
if (erroneous) {
Expand All @@ -243,7 +273,7 @@ function checkGrpcClientSpan(httpEntry, clientControls, url, cancel, erroneous)
return expectations;
}

function checkGrpcServerSpan(grpcExit, serverControls, url, cancel, erroneous) {
function checkGrpcServerSpan({ grpcExit, serverControls, url, cancel, erroneous }) {
let expectations = [
span => expect(span.n).to.equal('rpc-server'),
span => expect(span.k).to.equal(constants.ENTRY),
Expand Down Expand Up @@ -271,7 +301,7 @@ function checkGrpcServerSpan(grpcExit, serverControls, url, cancel, erroneous) {
return expectations;
}

function checkLogSpanAfterGrpcExit(httpEntry, url, cancel, erroneous) {
function checkLogSpanAfterGrpcExit({ httpEntry, url, cancel, erroneous }) {
const expectations = [
span => expect(span.n).to.equal('log.pino'),
span => expect(span.k).to.equal(constants.EXIT),
Expand All @@ -288,7 +318,7 @@ function checkLogSpanAfterGrpcExit(httpEntry, url, cancel, erroneous) {
return expectations;
}

function checkLogSpanDuringGrpcEntry(grpcEntry, url, erroneous) {
function checkLogSpanDuringGrpcEntry({ grpcEntry, url, erroneous }) {
const expectations = [
span => expect(span.n).to.equal('log.pino'),
span => expect(span.k).to.equal(constants.EXIT),
Expand All @@ -306,6 +336,8 @@ function checkLogSpanDuringGrpcEntry(grpcEntry, url, erroneous) {
function rpcCallNameForUrl(url) {
switch (url) {
case '/unary-call':
// fall through
case '/two-different-hosts':
return 'instana.node.grpc.test.TestService/MakeUnaryCall';
case '/server-stream':
return 'instana.node.grpc.test.TestService/StartServerSideStreaming';
Expand Down
38 changes: 22 additions & 16 deletions packages/core/src/tracing/instrumentation/protocols/grpcJs.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,24 @@ function instrumentServer(serverModule) {
}

function instrumentClient(clientModule) {
let address;

class ClientMock extends clientModule.Client {
constructor(_address) {
address = _address;
class InstanaWrapper extends clientModule.Client {
constructor(address) {
super(...arguments);

try {
const hostAndPort = splitHostPort(address);
if (hostAndPort.port && typeof hostAndPort.port === 'number') {
hostAndPort.port = hostAndPort.port.toString();
}
this._hostAndPort = hostAndPort;
} catch (e) {
this._hostAndPort = {};
logger.warn(`Failed to parse GRPC-JS destination addresss: ${address}`);
}
}
}

clientModule.Client = ClientMock;
clientModule.Client = InstanaWrapper;

const fnArr = [
{ name: 'makeUnaryRequest', responseStream: false, requestStream: false },
Expand All @@ -68,12 +76,6 @@ function instrumentClient(clientModule) {

shimmer.wrap(clientModule.Client.prototype, name, function (origFn) {
return function (method) {
const hostAndPort = splitHostPort(address);

if (hostAndPort.port && typeof hostAndPort.port === 'number') {
hostAndPort.port = hostAndPort.port.toString();
}

const originalArgs = copyArgs(arguments);

const skipTracingResult = cls.skipExitTracing({ isActive, extendedResponse: true });
Expand All @@ -93,7 +95,7 @@ function instrumentClient(clientModule) {
this,
origFn,
originalArgs,
hostAndPort,
this._hostAndPort,
method,
requestStream,
responseStream,
Expand Down Expand Up @@ -378,7 +380,7 @@ function instrumentedClientMethod(
ctx,
originalFunction,
originalArgs,
address,
hostAndPort,
rpcPath,
requestStream,
responseStream,
Expand All @@ -390,8 +392,8 @@ function instrumentedClientMethod(
span.stack = tracingUtil.getStackTrace(instrumentedClientMethod);

span.data.rpc = {
host: address.host,
port: address.port,
host: hostAndPort.host,
port: hostAndPort.port,
call: dropLeadingSlash(rpcPath),
flavor: 'grpc'
};
Expand Down Expand Up @@ -441,6 +443,10 @@ function readMetadata(metadata, key) {
// Copied from https://github.com/grpc/grpc-node/blob/master/packages/grpc-js/src/uri-parser.ts
const NUMBER_REGEX = /^\d+$/;
function splitHostPort(path) {
if (typeof path !== 'string') {
return { host: null, port: null };
}

if (path.startsWith('[')) {
const hostEnd = path.indexOf(']');

Expand Down

0 comments on commit 5bc3188

Please sign in to comment.