diff --git a/package.json b/package.json index e5d4da6..9480a12 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "mocha": "^3.0.0", "remap-istanbul": "^0.6.4", "tslint": "^3.13.0", - "typescript": "^1.8.10", + "typescript": "^2.0.0", "typings": "^1.3.2" }, "typings": "dist/index.d.ts", diff --git a/src/index.ts b/src/index.ts index 71f845b..df423ee 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1 +1 @@ -export { FilteredPubSub, SubscriptionManager } from './pubsub'; \ No newline at end of file +export { PubSub, SubscriptionManager } from './pubsub'; diff --git a/src/pubsub.ts b/src/pubsub.ts index 2e8cac0..a44d8e4 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -2,7 +2,7 @@ // This is basically just event emitters wrapped with a function that filters messages. // import { EventEmitter } from 'events'; -import graphql, { +import graphql, { GraphQLSchema, GraphQLError, validate, @@ -21,7 +21,13 @@ import { subscriptionHasSingleRootField } from './validation'; -export class FilteredPubSub { +export interface PubSubEngine { + publish(triggerName: string, payload: any): boolean + subscribe(triggerName: string, onMessage: Function): number + unsubscribe(subId: number) +} + +export class PubSub implements PubSubEngine { private ee: EventEmitter; private subscriptions: {[key: string]: [string, Function]}; private subIdCounter: number; @@ -32,27 +38,28 @@ export class FilteredPubSub { this.subIdCounter = 0; } - public publish(triggerName: string, payload: any){ + public publish(triggerName: string, payload: any): boolean { this.ee.emit(triggerName, payload); + // Not using the value returned from emit method because it gives + // irrelevant false when there are no listeners. + return true; } - public subscribe(triggerName: string, filterFunc: Function, handler: Function): number{ - // notify handler only if filterFunc returns true - const onMessage = (data) => filterFunc(data) ? handler(data) : null + public subscribe(triggerName: string, onMessage: Function): number { this.ee.addListener(triggerName, onMessage); this.subIdCounter = this.subIdCounter + 1; this.subscriptions[this.subIdCounter] = [triggerName, onMessage]; - return this.subIdCounter; + return this.subIdCounter; } - public unsubscribe(subId: number): void { + public unsubscribe(subId: number) { const [triggerName, onMessage] = this.subscriptions[subId]; delete this.subscriptions[subId]; this.ee.removeListener(triggerName, onMessage); } } -export class ValidationError extends Error{ +export class ValidationError extends Error { errors: Array; message: string; @@ -75,21 +82,23 @@ export interface SubscriptionOptions { // This manages actual GraphQL subscriptions. export class SubscriptionManager { - private pubsub: FilteredPubSub; + private pubsub: PubSubEngine; private schema: GraphQLSchema; private setupFunctions: { [subscriptionName: string]: Function }; private subscriptions: { [externalId: number]: Array}; private maxSubscriptionId: number; - constructor(options: { schema: GraphQLSchema, setupFunctions: {[subscriptionName: string]: Function} }){ - this.pubsub = new FilteredPubSub(); + constructor(options: { schema: GraphQLSchema, + setupFunctions: {[subscriptionName: string]: Function}, + pubsub: PubSubEngine }){ + this.pubsub = options.pubsub; this.schema = options.schema; this.setupFunctions = options.setupFunctions || {}; this.subscriptions = {}; this.maxSubscriptionId = 0; } - public publish(triggerName: string, payload: any){ + public publish(triggerName: string, payload: any) { this.pubsub.publish(triggerName, payload); } @@ -126,7 +135,7 @@ export class SubscriptionManager { const fields = this.schema.getSubscriptionType().getFields(); rootField.arguments.forEach( arg => { // we have to get the one arg's definition from the schema - const argDefinition = fields[subscriptionName].args.filter( + const argDefinition = fields[subscriptionName].args.filter( argDef => argDef.name === arg.name.value )[0]; args[argDefinition.name] = valueFromAST(arg.value, argDefinition.type, options.variables); @@ -146,7 +155,7 @@ export class SubscriptionManager { Object.keys(triggerMap).forEach( triggerName => { // 2. generate the filter function and the handler function const onMessage = rootValue => { - // rootValue is the payload sent by the event emitter / trigger + // rootValue is the payload sent by the event emitter / trigger // by convention this is the value returned from the mutation resolver try { @@ -166,9 +175,12 @@ export class SubscriptionManager { } } + const isTriggering: Function = triggerMap[triggerName]; + // 3. subscribe and return the subscription id this.subscriptions[externalSubscriptionId].push( - this.pubsub.subscribe(triggerName, triggerMap[triggerName], onMessage) + // Will run the onMessage function only if the message passes the filter function. + this.pubsub.subscribe(triggerName, (data) => isTriggering(data) && onMessage(data)) ); }); return externalSubscriptionId; @@ -180,4 +192,4 @@ export class SubscriptionManager { this.pubsub.unsubscribe(internalId); }); } -} \ No newline at end of file +} diff --git a/src/test/tests.ts b/src/test/tests.ts index 60aa1e0..3e50300 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -14,39 +14,32 @@ import { } from 'graphql'; import { - FilteredPubSub, + PubSub, SubscriptionManager, } from '../pubsub'; import { subscriptionHasSingleRootField } from '../validation'; -describe('FilteredPubSub', function() { +describe('PubSub', function() { it('can subscribe and is called when events happen', function(done) { - const ps = new FilteredPubSub(); - ps.subscribe('a', () => true, payload => { + const ps = new PubSub(); + ps.subscribe('a', payload => { expect(payload).to.equals('test'); done(); }); - ps.publish('a', 'test'); - }); - - it('can filter events that get sent to subscribers', function(done) { - const ps = new FilteredPubSub(); - ps.subscribe('a', payload => payload !== 'bad', payload => { - expect(payload).to.equals('good'); - done(); - }); - ps.publish('a', 'bad'); - ps.publish('a', 'good'); + const succeed = ps.publish('a', 'test'); + expect(succeed).to.be.true; }); it('can unsubscribe', function(done) { - const ps = new FilteredPubSub(); - const subId = ps.subscribe('a', () => true, payload => { + const ps = new PubSub(); + const subId = ps.subscribe('a', payload => { assert(false); }); ps.unsubscribe(subId); - ps.publish('a', 'test'); + const succeed = ps.publish('a', 'test'); + expect(succeed).to.be.true; // True because publish success is not + // indicated by trigger having subscriptions done(); // works because pubsub is synchronous }); }); @@ -112,6 +105,7 @@ describe('SubscriptionManager', function() { }; }, }, + pubsub: new PubSub(), }); it('throws an error if query is not valid', function() { const query = 'query a{ testInt }'; @@ -234,6 +228,11 @@ describe('SubscriptionManager', function() { setTimeout(done, 30); }); + it('throws an error when trying to unsubscribe from unknown id', function () { + expect(() => subManager.unsubscribe(123)) + .to.throw('undefined'); + }); + it('calls the error callback if there is an execution error', function(done) { const query = `subscription X($uga: Boolean!){ testSubscription @skip(if: $uga)