Skip to content

feat: added support for graphql-subscriptions v3 #1446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 18, 2024
Merged
14 changes: 13 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@
"got": "^14.4.4",
"got-v11": "npm:got@^11.8.6",
"graphql": "^16.9.0",
"graphql-subscriptions": "^2.0.0",
"graphql-subscriptions-v2": "npm:graphql-subscriptions@^2.0.0",
"graphql-subscriptions": "^3.0.0",
"graphql-tag": "^2.12.6",
"graphql-ws": "^5.16.0",
"husky": "^7.0.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ process.on('SIGTERM', () => {
process.exit(0);
});

require('./mockVersion');
require('../../../..')();

const cls = require('../../../../../core/src/tracing/cls');
Expand All @@ -22,10 +23,14 @@ const port = require('../../../test_util/app-port')();

const pubsub = new graphqlSubscriptions.PubSub();
const eventName = 'event-name';
const asyncIterator = pubsub.asyncIterator(eventName);

const version = process.env.GRAPHQL_SUBSCRIPTIONS_VERSION;
const isV2 = version === 'v2';

const iterator = isV2 ? pubsub.asyncIterator(eventName) : pubsub.asyncIterableIterator(eventName);

const app = express();
const logPrefix = `PubSub AsyncIterator pull-before-push app (${process.pid}):\t`;
const logPrefix = `PubSub iterator pull-before-push app (${process.pid}):\t`;

if (process.env.WITH_STDOUT) {
app.use(morgan(`${logPrefix}:method :url :status`));
Expand All @@ -37,17 +42,17 @@ app.get('/', (req, res) => {

const valuesReadFromCls = [];

// Calling asyncIterator.pullValue is what happens _before_ the pubsub.publish happens (possibly during
// Calling iterator.pullValue is what happens _before_ the pubsub.publish happens (possibly during
// pubsub.subscribe). The promises returned by pullValue will only resolve after we have pushed values.
asyncIterator.pullValue().then(event1 => {
iterator.pullValue().then(event1 => {
// Chronologically, everything inside the then-handler this happens after cls.ns.set (see below). Due to custom
// queueing in pubsub_async_iterator, the cls context would get lost though (unless we fix it).
log(event1);
valuesReadFromCls.push(cls.ns.get('key', true));
asyncIterator.pullValue().then(event2 => {
iterator.pullValue().then(event2 => {
log(event2);
valuesReadFromCls.push(cls.ns.get('key', true));
asyncIterator.pullValue().then(event3 => {
iterator.pullValue().then(event3 => {
log(event3);
valuesReadFromCls.push(cls.ns.get('key', true));
});
Expand All @@ -57,11 +62,11 @@ asyncIterator.pullValue().then(event1 => {
app.get('/pull-before-push', (req, res) => {
// This order of events (pulling values before pushing values) does not work without some tracing blood magic.

// Calling asyncIterator.pushValue is what happens during pubsub.publish('event-name', { ... })
// Calling iterator.pushValue is what happens during pubsub.publish('event-name', { ... })
cls.ns.set('key', 'test-value');
asyncIterator.pushValue({ name: 'event-01' });
asyncIterator.pushValue({ name: 'event-02' });
asyncIterator.pushValue({ name: 'event-03' });
iterator.pushValue({ name: 'event-01' });
iterator.pushValue({ name: 'event-02' });
iterator.pushValue({ name: 'event-03' });
setTimeout(() => {
res.send(valuesReadFromCls);
}, 200);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* (c) Copyright IBM Corp. 2024
*/

'use strict';

const mock = require('mock-require');
const hook = require('../../../../../core/src/util/hook');

const GRAPHQL_SUBSCRIPTIONS_VERSION = process.env.GRAPHQL_SUBSCRIPTIONS_VERSION;
const GRAPHQL_SUBSCRIPTIONS_REQUIRE =
process.env.GRAPHQL_SUBSCRIPTIONS_VERSION === 'latest'
? 'graphql-subscriptions'
: `graphql-subscriptions-${GRAPHQL_SUBSCRIPTIONS_VERSION}`;

if (GRAPHQL_SUBSCRIPTIONS_REQUIRE !== 'graphql-subscriptions') {
mock('graphql-subscriptions', GRAPHQL_SUBSCRIPTIONS_REQUIRE);
}

const originalOnFileLoad = hook.onFileLoad;
hook.onFileLoad = function onFileLoad() {
if (arguments[0].source === '\\/graphql-subscriptions\\/dist\\/pubsub-async-iterator\\.js') {
const str = arguments[0].source.replace('graphql-subscriptions', GRAPHQL_SUBSCRIPTIONS_REQUIRE);
const reg = new RegExp(str, '');
arguments[0] = reg;
return originalOnFileLoad.apply(this, arguments);
}

return originalOnFileLoad.apply(this, arguments);
};
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,48 @@ const globalAgent = require('../../../globalAgent');

const mochaSuiteFn = supportedVersion(process.versions.node) ? describe : describe.skip;

mochaSuiteFn('tracing/graphql-subscriptions - PubSub/async iterator (pull before push)', function () {
this.timeout(config.getTestTimeout());

globalAgent.setUpCleanUpHooks();
let controls;

before(async () => {
controls = new ProcessControls({
dirname: __dirname,
useGlobalAgent: true
['latest', 'v2'].forEach(version => {
mochaSuiteFn(`tracing/graphql-subscriptions@${version} - PubSub/async iterator (pull before push)`, function () {
this.timeout(config.getTestTimeout());

globalAgent.setUpCleanUpHooks();
let controls;

before(async () => {
controls = new ProcessControls({
dirname: __dirname,
env: {
GRAPHQL_SUBSCRIPTIONS_VERSION: version
},
useGlobalAgent: true
});

await controls.startAndWaitForAgentConnection();
});

await controls.startAndWaitForAgentConnection();
});
beforeEach(async () => {
await globalAgent.instance.clearReceivedTraceData();
});

beforeEach(async () => {
await globalAgent.instance.clearReceivedTraceData();
});
after(async () => {
await controls.stop();
});

after(async () => {
await controls.stop();
});
afterEach(async () => {
await controls.clearIpcMessages();
});

afterEach(async () => {
await controls.clearIpcMessages();
it('should keep cls context when pulling before pushing', () =>
controls
.sendRequest({
method: 'GET',
path: '/pull-before-push'
})
.then(valuesReadFromCls => {
expect(valuesReadFromCls).to.have.lengthOf(3);
expect(valuesReadFromCls[0]).to.equal('test-value');
expect(valuesReadFromCls[1]).to.equal('test-value');
expect(valuesReadFromCls[2]).to.equal('test-value');
}));
});

it('should keep cls context when pulling before pushing', () =>
controls
.sendRequest({
method: 'GET',
path: '/pull-before-push'
})
.then(valuesReadFromCls => {
expect(valuesReadFromCls).to.have.lengthOf(3);
expect(valuesReadFromCls[0]).to.equal('test-value');
expect(valuesReadFromCls[1]).to.equal('test-value');
expect(valuesReadFromCls[2]).to.equal('test-value');
}));
});
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,14 @@ module.exports = function exportSchema() {
},
subscribe: (__, { id }) => {
pinoLogger.warn(`subscribe: ${id}`);
return pubsub.asyncIterator('characterUpdated');

// for graphql-subscriptions, asyncIterator is replaced with asyncIterableIterator in v3
if (pubsub?.asyncIterableIterator) {
return pubsub.asyncIterableIterator('characterUpdated');
} else {
// v2 or lesser
return pubsub.asyncIterator('characterUpdated');
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,9 @@ function verifySpansForSubscriptionUpdates(spans, triggerUpdateVia) {
}

const subscribeHttpEntryInClientApp = verifyHttpEntry(null, /\/subscription/, spans);

expect(spans.length).to.be.eql(8);

// verify that the subscription entry has no children
spans.forEach(span => {
expect(span.p).to.not.equal(subscribeHttpEntryInClientApp.s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const CLS_CONTEXT_SYMBOL = Symbol('_instana_cls_context');

exports.init = () => {
hook.onModuleLoad('graphql-subscriptions', instrumentModule);
// In v3, pubsub-async-iterator is replaced with pubsub-async-iterable-iterator
hook.onFileLoad(/\/graphql-subscriptions\/dist\/pubsub-async-iterable-iterator\.js/, instrumentAsyncIterableIterator);

// In v2, pubsub-async-iterator is available
hook.onFileLoad(/\/graphql-subscriptions\/dist\/pubsub-async-iterator\.js/, instrumentAsyncIterator);
};

Expand All @@ -40,11 +44,17 @@ function instrumentAsyncIterator(pubSubAsyncIterator) {
shimmer.wrap(pubSubAsyncIterator.PubSubAsyncIterator.prototype, 'pullValue', shimPullValue);
}

function instrumentAsyncIterableIterator(pubSubAsyncIterator) {
shimmer.wrap(pubSubAsyncIterator.PubSubAsyncIterableIterator.prototype, 'pushValue', shimPushValue);
shimmer.wrap(pubSubAsyncIterator.PubSubAsyncIterableIterator.prototype, 'pullValue', shimPullValue);
}

function shimPushValue(originalFunction) {
return function (event) {
if (isActive && event && typeof event === 'object' && cls.ns.active) {
event[CLS_CONTEXT_SYMBOL] = cls.ns.active;
}

return originalFunction.apply(this, arguments);
};
}
Expand Down