From 1678acbff6a4fcbccff342d094c37b2e36bd0fff Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Wed, 30 Sep 2015 16:30:11 -0400 Subject: [PATCH 01/26] Started tests for event streams --- test/test_api.js | 9 +++ test/test_event_streams.js | 117 +++++++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 test/test_event_streams.js diff --git a/test/test_api.js b/test/test_api.js index d40b95e..d00b135 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -353,6 +353,15 @@ describe('Zetta Api', function() { .end(done) }); + it('should contain link for event stream', function(done) { + request(getHttpServer(app)) + .get('/') + .expect(getBody(function(res, body) { + hasLinkRel(body.links, rels.events); + })) + .end(done) + }); + it('should use a default server name if none has been provided', function(done) { var app = zetta({ registry: reg, peerRegistry: peerRegistry }).silent()._run(); diff --git a/test/test_event_streams.js b/test/test_event_streams.js new file mode 100644 index 0000000..a6d82cb --- /dev/null +++ b/test/test_event_streams.js @@ -0,0 +1,117 @@ +var assert = require('assert'); +var WebSocket = require('ws'); +var zetta = require('./..'); +var zettacluster = require('zetta-cluster'); +var Driver = require('./fixture/example_driver'); + +describe('Event Streams', function() { + var cluster = null; + var urls = []; + var baseUrl = '/events'; + + beforeEach(function(done) { + urls = []; + cluster = zettacluster({ zetta: zetta }) + .server('cloud') + .server('hub', [Driver], ['cloud']) + .on('ready', function() { + app = cluster.servers['cloud']; + urls.push('localhost:' + cluster.servers['cloud']._testPort); + urls.push('localhost:' + cluster.servers['hub']._testPort); + done(); + }) + .run(function(err){ + if (err) { + return done(err); + } + }); + }); + + afterEach(function() { + cluster.stop(); + }); + + describe('Websocket API', function() { + var itBoth = function(testMsg, test) { + it('for cloud, ' + testMsg, test.bind(null, 0)); + it('for hub, ' + testMsg, test.bind(null, 1)); + }; + + itBoth('subscribing to a topic receives a subscription-ack', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { action: 'subscribe', topic: 'hub/led/1234/state' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, 'hub/led/1234/state'); + assert(json.subscriptionId); + done(); + }); + }); + ws.on('error', done); + }); + + it('unsubscribing to a topic receives a unsubscription-ack', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { action: 'subscribe', topic: 'hub/led/1234/state' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + var msg = { action: 'unsubscribe', subscriptionId, json.subscriptionId }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json2 = JSON.parse(buffer); + assert.equal(json2.type, 'unsubscribe-ack'); + assert(json2.timestamp); + assert.equal(json2.subscriptionId, json.subscriptionId); + done(); + }); + }); + }); + ws.on('error', done); + }); + + it('verify error message format', function(){}) + + it('specific topic subscription only receives messages with that topic', function() {}) + + it('wildcard topic receives all messages for all topics', function() {}) + + it('topic that doesnt exist still opens stream', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { action: 'subscribe', topic: 'random_topic' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, 'random_topic'); + assert(json.subscriptionId); + done(); + }); + }); + ws.on('error', done); + }) + + it('wildcard and specific topic will each publish a message on a subscription', function() {}) + + it('topic format is {server}/{type}/{id}/{streamName}', function() {}) + + it('adding limit to subscription should limit number of messages received', function(){}) + + it('when limit is reached a unsubscribe-ack should be received', function(){}) + + }); + + describe('SPDY API', function() { + }); + +}); From d0bfbb69203f8460c9cfb166d10921178cc0bf88 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 1 Oct 2015 10:08:38 -0400 Subject: [PATCH 02/26] Adding tests for websocket refactor. --- test/test_event_streams.js | 182 ++++++++++++++++++++++++++++++++++--- 1 file changed, 171 insertions(+), 11 deletions(-) diff --git a/test/test_event_streams.js b/test/test_event_streams.js index a6d82cb..39df30f 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -55,7 +55,7 @@ describe('Event Streams', function() { ws.on('error', done); }); - it('unsubscribing to a topic receives a unsubscription-ack', function(idx, done) { + itBoth('unsubscribing to a topic receives a unsubscription-ack', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); ws.on('open', function() { @@ -77,13 +77,71 @@ describe('Event Streams', function() { ws.on('error', done); }); - it('verify error message format', function(){}) + itBoth('verify error message format', function(){}) - it('specific topic subscription only receives messages with that topic', function() {}) + itBoth('specific topic subscription only receives messages witBothh that topic', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = 'hub/led/1234/state'; + ws.on('open', function() { + var msg = { action: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + done(); + } + }); + }); + ws.on('error', done); + }); - it('wildcard topic receives all messages for all topics', function() {}) + itBoth('wildcard topic receives all messages for all topics', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/led/*/state'; + ws.on('open', function() { + var msg = { action: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + count++; + if(count === 2) { + done(); + } + } + }); + }); + ws.on('error', done); + }); - it('topic that doesnt exist still opens stream', function(idx, done) { + itBoth('topic that doesnt exist still opens stream', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); ws.on('open', function() { @@ -99,15 +157,117 @@ describe('Event Streams', function() { }); }); ws.on('error', done); - }) - - it('wildcard and specific topic will each publish a message on a subscription', function() {}) + }); - it('topic format is {server}/{type}/{id}/{streamName}', function() {}) + itBoth('wildcard and specific topic will each publish a message on a subscription', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var ackCount = 0; + var topicOne = 'hub/led/*/state'; + var topicTwo = 'hub/led/1234/state'; + var data = null; + ws.on('open', function() { + var msgOne = { action: 'subscribe', topic: topicOne }; + var msgTwo = { action: 'subscribe', topic: topicTwo }; + ws.send(JSON.stringify(msgOne)); + ws.send(JSON.stringify(msgTwo)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + ackCount++; + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + assert.equal(json.data, data); + count++; + if(count === 2) { + assert.equal(ackCount, 2); + done(); + } + } + }); + }); + ws.on('error', done); + }); - it('adding limit to subscription should limit number of messages received', function(){}) + itBoth('adding limit to subscription should limit number of messages received', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/led/1234/state'; + var data = null; + ws.on('open', function() { + var msg = { action: 'subscribe', topic: topic, limit: 10 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert.equal(json.data); + count++; + if(count === 10) { + done(); + } + } + }); + }); + ws.on('error', done); + }); - it('when limit is reached a unsubscribe-ack should be received', function(){}) + itBoth('when limit is reached a unsubscribe-ack should be received', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/led/1234/state'; + var data = null; + ws.on('open', function() { + var msg = { action: 'subscribe', topic: topic, limit: 10 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + } else if(json.type === 'event') { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert.equal(json.data); + count++; + } else if(json.type === 'unsubscribe-ack') { + assert.equal(json.type, 'unsubscribe-ack'); + assert(timestamp); + assert.equal(json.subscriptionId, subscriptionId); + done(); + } + }); + }); + ws.on('error', done); + }); }); From 0365724d055e60e417bae4e3daed9c97aef764b6 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 1 Oct 2015 13:31:26 -0400 Subject: [PATCH 03/26] Added stream topic class for websocket stream protocol, and tests for parsing functionality. --- lib/api_resources/root.js | 4 ++++ lib/event_socket.js | 2 ++ lib/http_server.js | 7 ++++++- test/test_event_streams.js | 2 +- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/api_resources/root.js b/lib/api_resources/root.js index 6f0ddd0..1535e41 100644 --- a/lib/api_resources/root.js +++ b/lib/api_resources/root.js @@ -225,6 +225,10 @@ RootResource.prototype._renderRoot = function(env, next) { title: this.server._name, rel: [rels.server], href: env.helpers.url.path('/servers/' + encodeURI(this.server.id) ) + }, + { + rel: [rels.events], + href: env.helpers.url.path('/events').replace(/^http/, 'ws') } ], actions: [ diff --git a/lib/event_socket.js b/lib/event_socket.js index 73c0e7a..f6918a6 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -4,6 +4,8 @@ var ObjectStream = require('zetta-streams').ObjectStream; var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions; var deviceFormatter = require('./api_formats/siren/device.siren'); +//Flag to indicate that we expect data back on teh websocket +//Tracking subscriptions var EventSocket = module.exports = function(ws, query) { EventEmitter.call(this); this.ws = ws; diff --git a/lib/http_server.js b/lib/http_server.js index b0f9d97..58c5f01 100644 --- a/lib/http_server.js +++ b/lib/http_server.js @@ -102,7 +102,12 @@ var ZettaHttpServer = module.exports = function(zettaInstance) { var client = new EventSocket(ws, query); self.eventBroker.client(client); } else { - self.setupEventSocket(ws); + var parsedUrl = url.parse(ws.upgradeReq.url, true); + if(parsedUrl.pathname === '/events') { + + } else { + self.setupEventSocket(ws); + } } }); } diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 39df30f..7a35c83 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -63,7 +63,7 @@ describe('Event Streams', function() { ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); - var msg = { action: 'unsubscribe', subscriptionId, json.subscriptionId }; + var msg = { action: 'unsubscribe', subscriptionId: json.subscriptionId }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json2 = JSON.parse(buffer); From 0daba395e820b84ec9ac320aa05f10fee24acd37 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 1 Oct 2015 13:42:04 -0400 Subject: [PATCH 04/26] Added stream topic stuff. --- lib/stream_topic.js | 44 +++++++++++++++++++++++++++++++++++++++ test/test_stream_topic.js | 32 ++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 lib/stream_topic.js create mode 100644 test/test_stream_topic.js diff --git a/lib/stream_topic.js b/lib/stream_topic.js new file mode 100644 index 0000000..9a5841d --- /dev/null +++ b/lib/stream_topic.js @@ -0,0 +1,44 @@ +var StreamTopic = module.exports = function() { + +} + +StreamTopic.prototype.parse = function(topicString){ + var previousCharacter = null; + var currentCharacter = null; + var start = 0; + var topicComponents = []; + for(var i = 0; i < topicString.length; i++) { + currentCharacter = topicString[i]; + if(currentCharacter === '/' && previousCharacter !== '\\') { + topicComponents.push(topicString.slice(start, i)); + start = i + 1; + } else if(i === topicString.length - 1) { + topicComponents.push(topicString.slice(start, topicString.length)); + } + } + + if(topicComponents.length !== 4) { + throw new Error('Topic Parse Error'); + } + + function checkForRegex(s) { + if(s[0] === '{' && s[s.length - 1] === '}') { + return new RegExp(s.slice(1, -1)); + } else { + return s; + } + } + + this.serverName = checkForRegex(topicComponents[0]); + this.deviceType = checkForRegex(topicComponents[1]); + this.deviceId = checkForRegex(topicComponents[2]); + var streamComponents = topicComponents[3].split('?'); + this.streamName = checkForRegex(streamComponents[0]); + this.streamQuery = streamComponents[1]; +} + +StreamTopic.parse = function(topicString) { + var topic = new StreamTopic(); + topic.parse(topicString); + return topic; +} diff --git a/test/test_stream_topic.js b/test/test_stream_topic.js new file mode 100644 index 0000000..7934729 --- /dev/null +++ b/test/test_stream_topic.js @@ -0,0 +1,32 @@ +var StreamTopic = require('../lib/stream_topic'); +var assert = require('assert'); + +describe('Stream Topic', function() { + it('will correctly parse a topic of all valid strings', function() { + var t = new StreamTopic(); + t.parse('Detroit/led/1234/state'); + assert.equal(t.serverName, 'Detroit'); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + }); + + it('will correctly parse a regex out of a topic string', function() { + var t = new StreamTopic(); + t.parse('{^Det.+$}/led/1234/state'); + assert(t.serverName.test); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + }); + + it('will correctly parse a query out of a topic string', function() { + var t = new StreamTopic(); + t.parse('Detroit/led/1234/state?select * where data > 80'); + assert.equal(t.serverName, 'Detroit'); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + assert.equal(t.streamQuery, 'select * where data > 80'); + }); +}); From f10dd38d0b0a11b8ad2e78508b68636621ff4de6 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 1 Oct 2015 14:26:04 -0400 Subject: [PATCH 05/26] Added a stream parser for the new websocket protocol. --- lib/event_streams_parser.js | 44 ++++++++++++++++++++ test/test_event_stream_parser.js | 70 ++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 lib/event_streams_parser.js create mode 100644 test/test_event_stream_parser.js diff --git a/lib/event_streams_parser.js b/lib/event_streams_parser.js new file mode 100644 index 0000000..abe84ff --- /dev/null +++ b/lib/event_streams_parser.js @@ -0,0 +1,44 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +var EventStreamParser = module.exports = function() { + EventEmitter.call(this); +}; +util.inherits(EventStreamParser, EventEmitter); + +EventStreamParser.prototype.add = function(buf) { + var json = null; + var self = this; + try { + json = JSON.parse(buf.toString()); + } catch(e) { + self.emit('error', e); + } + + +}; + +EventStreamParser.prototype.validate = function(json) { + var properties = { + 'subscribe': { topic: 'string' }, + 'unsubscribe': { subscriptionId: 'number' }, + 'error': { code: 'number', timestamp: 'number', topic: 'string' }, + 'subscribe-ack': { timestamp: 'number', subscriptionId: 'number', topic: 'string' }, + 'unsubscribe-ack': { timestamp: 'number', subscriptionId: 'number' }, + 'event': { topic: 'string', timestamp: 'number', subscriptionId: 'number'} + } + + var keys = properties[json.type]; + var valid = true; + if(keys) { + Object.keys(keys).forEach(function(key) { + if(typeof json[key] !== keys[key]) { + valid = false; + } + }); + } else { + return false; + } + + return valid; +}; diff --git a/test/test_event_stream_parser.js b/test/test_event_stream_parser.js new file mode 100644 index 0000000..ddd8c47 --- /dev/null +++ b/test/test_event_stream_parser.js @@ -0,0 +1,70 @@ +var EventStreamParser = require('../lib/event_streams_parser'); +var assert = require('assert'); + +describe('Event Stream Parser', function() { + it('validates subscribe messages correctly', function() { + var message = { type: 'subscribe', topic: 'Detroit/led/1234/state' }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('invalidates subscribe messages correctly', function() { + var message = { type: 'subscribe'}; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('validates unsubscribe messages correctly', function() { + var message = { type: 'unsubscribe', subscriptionId: 1 }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('invalidates unsubscribe messages correctly', function() { + var message = { type: 'unsubscribe' }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('validates unsubscribe-ack messages correctly', function() { + var message = { type: 'unsubscribe-ack', subscriptionId: 1, timestamp: 1 }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('invalidates unsubscribe-ack messages correctly no subscriptionId', function() { + var message = { type: 'unsubscribe-ack', timestamp: 1 }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('invalidates unsubscribe-ack messages correctly no timestamp no subscriptionId', function() { + var message = { type: 'unsubscribe-ack' }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('invalidates unsubscribe-ack messages correctly no timestamp', function() { + var message = { type: 'unsubscribe-ack', timestamp: 1 }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('validates subscribe-ack messages correctly', function() { + var message = { type: 'unsubscribe-ack', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1}; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('validates error messages correctly', function() { + var message = { type: 'error', code: 1, timestamp: 1, topic: 'Detroit/led/1234/state' }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('validates event messages correctly', function() { + var message = { type: 'event', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1 }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); +}); From 97d42ff191321660868e22274ef552eb39c6559c Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Thu, 1 Oct 2015 16:22:15 -0400 Subject: [PATCH 06/26] EventSocket parses incoming messages when flag is set. --- lib/event_socket.js | 96 +++++++++++++++++++++++++++++--- lib/event_streams_parser.js | 9 ++- test/test_event_socket.js | 18 ++++++ test/test_event_stream_parser.js | 34 +++++++++++ 4 files changed, 147 insertions(+), 10 deletions(-) diff --git a/lib/event_socket.js b/lib/event_socket.js index f6918a6..1a7172a 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -1,12 +1,14 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var ObjectStream = require('zetta-streams').ObjectStream; +var EventStreamsParser = require('./event_streams_parser'); +var StreamTopic = require('./stream_topic'); var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions; var deviceFormatter = require('./api_formats/siren/device.siren'); //Flag to indicate that we expect data back on teh websocket //Tracking subscriptions -var EventSocket = module.exports = function(ws, query) { +var EventSocket = module.exports = function(ws, query, streamEnabled) { EventEmitter.call(this); this.ws = ws; @@ -14,6 +16,86 @@ var EventSocket = module.exports = function(ws, query) { query = [query]; } this.query = query; // contains .topic, .name + + // list of event streams + this._subscriptions = []; + this._subscriptionIndex = 0; + + // only setup parser when using event stream + if (streamEnabled) { + var self = this; + this._parser = new EventStreamsParser(); + this._parser.on('error', function(err, original) { + var msg = { + type: 'error', + code: 400, + timestamp: new Date().getTime(), + topic: (typeof original === 'object') ? original.topic : null, + message: err + }; + self.ws.send(JSON.stringify(msg)); + }); + + this._parser.on('subscribe', function(msg) { + var topic = new StreamTopic(); + try { + topic.parse(msg.topic); + } catch(err) { + var msg = { + type: 'error', + code: 400, + timestamp: new Date().getTime(), + topic: msg.topic, + message: err + }; + self.ws.send(JSON.stringify(msg)); + return; + } + + var subscription = { subscriptionId: ++self._subscriptionIndex, topic: topic, limit: msg.limit }; + self._subscriptions.push(subscription); + + var msg = { + type: 'subscription-ack', + timestamp: new Date().getTime(), + topic: msg.topic, + subscriptionId: subscription.subscriptionId + }; + self.ws.send(JSON.stringify(msg)); + self.emit('subscribe', subscription); + }); + + this._parser.on('unsubscribe', function(msg) { + var foundIdx = -1; + self._subscriptions.some(function(subscription, idx) { + if(subscription.subscriptionId === msg.subscriptionId) { + foundIdx = idx; + return true; + } + }); + + if (foundIdx < 0) { + var msg = { + type: 'error', + code: 405, + timestamp: new Date().getTime(), + message: new Error('Unable to unsubscribe from invalid subscriptionId') + }; + self.ws.send(JSON.stringify(msg)); + return; + } + + var subscription = self._subscriptions.splice(foundIdx, 1); + var msg = { + type: 'unsubscription-ack', + timestamp: new Date().getTime(), + subscriptionId: subscription.subscriptionId + }; + + self.emit('unsubscribe', subscription); + }); + } + this.init(); }; util.inherits(EventSocket, EventEmitter); @@ -52,19 +134,17 @@ EventSocket.prototype.send = function(topic, data) { this.ws.send.apply(this.ws, args); }; -EventSocket.prototype.onData = function(data) { - var args = ['data'].concat(Array.prototype.slice.call(arguments)); - // @todo handle remote devices publishing data on the websocket - this.emit.apply(this, args); -}; - EventSocket.prototype.onClose = function() { this.emit('close'); }; EventSocket.prototype.init = function() { var self = this; - this.ws.on('message', this.onData.bind(this)); + this.ws.on('message', function(buffer) { + if (self._parser) { + self._parser.add(buffer); + } + }); this.ws.on('close', this.onClose.bind(this)); this.ws.on('error',function(err){ console.error('ws error:', err); diff --git a/lib/event_streams_parser.js b/lib/event_streams_parser.js index abe84ff..6820d34 100644 --- a/lib/event_streams_parser.js +++ b/lib/event_streams_parser.js @@ -12,10 +12,15 @@ EventStreamParser.prototype.add = function(buf) { try { json = JSON.parse(buf.toString()); } catch(e) { - self.emit('error', e); + self.emit('error', e, buf); + return; } - + if (this.validate(json)) { + this.emit(json.type, json); + } else { + this.emit('error', new Error('Message validation failed.'), json); + } }; EventStreamParser.prototype.validate = function(json) { diff --git a/test/test_event_socket.js b/test/test_event_socket.js index 967f264..266f46d 100644 --- a/test/test_event_socket.js +++ b/test/test_event_socket.js @@ -67,6 +67,24 @@ describe('EventSocket', function() { },1); }); + it('should init parser if passed streaming flag', function() { + var ws = new Ws(); + var client = new EventSocket(ws, 'some-topic', true); + assert(client._parser) + }) + + it('should emit subscribe event when subscribe message is parsed', function(done) { + var ws = new Ws(); + var client = new EventSocket(ws, 'some-topic', true); + client.on('subscribe', function(subscription) { + assert(subscription.subscriptionId); + assert(subscription.topic); + assert.equal(subscription.limit, 10); + done(); + }) + var msg = { type: 'subscribe', topic: 'Detroit/led/1234/state', limit: 10}; + ws.emit('message', new Buffer(JSON.stringify(msg))); + }) }); diff --git a/test/test_event_stream_parser.js b/test/test_event_stream_parser.js index ddd8c47..a0ea964 100644 --- a/test/test_event_stream_parser.js +++ b/test/test_event_stream_parser.js @@ -67,4 +67,38 @@ describe('Event Stream Parser', function() { var parser = new EventStreamParser(); assert(parser.validate(message)); }); + + it('should emit event for message type when parsing buffer', function(done) { + var parser = new EventStreamParser(); + var message = { type: 'event', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1 }; + parser.on('event', function(msg) { + assert.equal(msg.type, message.type); + assert.equal(msg.timestamp, message.timestamp); + assert.equal(msg.topic, message.topic); + assert.equal(msg.subscriptionId, message.subscriptionId); + done(); + }); + + parser.add(new Buffer(JSON.stringify(message))); + }) + + it('should emit error for invalid message type when parsing buffer', function(done) { + var parser = new EventStreamParser(); + var message = { type: 'not-a-message', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1 }; + parser.on('error', function(msg) { + done(); + }); + + parser.add(new Buffer(JSON.stringify(message))); + }) + + it('should emit error for invalid JSON when parsing buffer', function(done) { + var parser = new EventStreamParser(); + parser.on('error', function(msg) { + done(); + }); + + parser.add(new Buffer('some text')); + }) + }); From 5b414b424d17762e980a99fb0deb3483baa14042 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 2 Oct 2015 11:02:08 -0400 Subject: [PATCH 07/26] Wiring up protocol parser to websockets. --- lib/event_socket.js | 20 +++++++++++--------- lib/event_streams_parser.js | 6 +++++- lib/http_server.js | 18 ++++++++++-------- package.json | 2 +- test/test_event_streams.js | 29 +++++++++++++++-------------- 5 files changed, 42 insertions(+), 33 deletions(-) diff --git a/lib/event_socket.js b/lib/event_socket.js index 1a7172a..9681570 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -11,12 +11,8 @@ var deviceFormatter = require('./api_formats/siren/device.siren'); var EventSocket = module.exports = function(ws, query, streamEnabled) { EventEmitter.call(this); this.ws = ws; + this.query = []; - if (!Array.isArray(query)) { - query = [query]; - } - this.query = query; // contains .topic, .name - // list of event streams this._subscriptions = []; this._subscriptionIndex = 0; @@ -31,7 +27,7 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { code: 400, timestamp: new Date().getTime(), topic: (typeof original === 'object') ? original.topic : null, - message: err + message: err.message }; self.ws.send(JSON.stringify(msg)); }); @@ -56,7 +52,7 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { self._subscriptions.push(subscription); var msg = { - type: 'subscription-ack', + type: 'subscribe-ack', timestamp: new Date().getTime(), topic: msg.topic, subscriptionId: subscription.subscriptionId @@ -85,15 +81,21 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { return; } - var subscription = self._subscriptions.splice(foundIdx, 1); + var subscription = self._subscriptions.splice(foundIdx, 1)[0]; var msg = { - type: 'unsubscription-ack', + type: 'unsubscribe-ack', timestamp: new Date().getTime(), subscriptionId: subscription.subscriptionId }; self.emit('unsubscribe', subscription); + self.ws.send(JSON.stringify(msg)); }); + } else { + if (!Array.isArray(query)) { + query = [query]; + } + this.query = query; // contains .topic, .name } this.init(); diff --git a/lib/event_streams_parser.js b/lib/event_streams_parser.js index 6820d34..a2618d9 100644 --- a/lib/event_streams_parser.js +++ b/lib/event_streams_parser.js @@ -10,7 +10,11 @@ EventStreamParser.prototype.add = function(buf) { var json = null; var self = this; try { - json = JSON.parse(buf.toString()); + if(Buffer.isBuffer(buf)) { + json = JSON.parse(buf.toString()); + } else { + json = JSON.parse(buf); + } } catch(e) { self.emit('error', e, buf); return; diff --git a/lib/http_server.js b/lib/http_server.js index 58c5f01..8fe6d89 100644 --- a/lib/http_server.js +++ b/lib/http_server.js @@ -102,12 +102,7 @@ var ZettaHttpServer = module.exports = function(zettaInstance) { var client = new EventSocket(ws, query); self.eventBroker.client(client); } else { - var parsedUrl = url.parse(ws.upgradeReq.url, true); - if(parsedUrl.pathname === '/events') { - - } else { - self.setupEventSocket(ws); - } + self.setupEventSocket(ws); } }); } @@ -230,8 +225,15 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) { if (/^\/events/.exec(ws.upgradeReq.url)) { self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name); + var queryString = url.parse(ws.upgradeReq.url).query; - var query = querystring.parse(url.parse(ws.upgradeReq.url).query); + if(!queryString) { + var client = new EventSocket(ws, null, true); + self.eventBroker.client(client); + return; + } + + var query = querystring.parse(queryString); function copy(q) { var c = {}; @@ -251,7 +253,7 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) { if (qt) { q.topic = querytopic.format(qt); } - var client = new EventSocket(ws, q); + var client = new EventSocket(ws, q, false); self.eventBroker.client(client); } }); diff --git a/package.json b/package.json index f4c6a1f..fdc8547 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "zetta-auto-scout": "^0.9.0", "zetta-device": "^0.17.0", "zetta-http-device": "^0.4.0", - "zetta-rels": "^0.4.0", + "zetta-rels": "^0.5.0", "zetta-scientist": "^0.5.0", "zetta-scout": "^0.6.0", "zetta-streams": "^0.3.0" diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 7a35c83..6edc8e7 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -41,7 +41,7 @@ describe('Event Streams', function() { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); ws.on('open', function() { - var msg = { action: 'subscribe', topic: 'hub/led/1234/state' }; + var msg = { type: 'subscribe', topic: 'hub/led/1234/state' }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); @@ -59,11 +59,11 @@ describe('Event Streams', function() { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); ws.on('open', function() { - var msg = { action: 'subscribe', topic: 'hub/led/1234/state' }; + var msg = { type: 'subscribe', topic: 'hub/led/1234/state' }; ws.send(JSON.stringify(msg)); - ws.on('message', function(buffer) { + ws.once('message', function(buffer) { var json = JSON.parse(buffer); - var msg = { action: 'unsubscribe', subscriptionId: json.subscriptionId }; + var msg = { type: 'unsubscribe', subscriptionId: json.subscriptionId }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json2 = JSON.parse(buffer); @@ -77,15 +77,15 @@ describe('Event Streams', function() { ws.on('error', done); }); - itBoth('verify error message format', function(){}) + itBoth('verify error message format', function(){}); - itBoth('specific topic subscription only receives messages witBothh that topic', function(idx, done) { + itBoth('specific topic subscription only receives messages with that topic', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); var subscriptionId = null; var topic = 'hub/led/1234/state'; ws.on('open', function() { - var msg = { action: 'subscribe', topic: topic }; + var msg = { type: 'subscribe', topic: topic }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); @@ -115,7 +115,7 @@ describe('Event Streams', function() { var count = 0; var topic = 'hub/led/*/state'; ws.on('open', function() { - var msg = { action: 'subscribe', topic: topic }; + var msg = { type: 'subscribe', topic: topic }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); @@ -144,14 +144,15 @@ describe('Event Streams', function() { itBoth('topic that doesnt exist still opens stream', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); + var topic = 'blah/foo/1/blah'; ws.on('open', function() { - var msg = { action: 'subscribe', topic: 'random_topic' }; + var msg = { type: 'subscribe', topic: topic }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); assert.equal(json.type, 'subscribe-ack'); assert(json.timestamp); - assert.equal(json.topic, 'random_topic'); + assert.equal(json.topic, topic); assert(json.subscriptionId); done(); }); @@ -169,8 +170,8 @@ describe('Event Streams', function() { var topicTwo = 'hub/led/1234/state'; var data = null; ws.on('open', function() { - var msgOne = { action: 'subscribe', topic: topicOne }; - var msgTwo = { action: 'subscribe', topic: topicTwo }; + var msgOne = { type: 'subscribe', topic: topicOne }; + var msgTwo = { type: 'subscribe', topic: topicTwo }; ws.send(JSON.stringify(msgOne)); ws.send(JSON.stringify(msgTwo)); ws.on('message', function(buffer) { @@ -207,7 +208,7 @@ describe('Event Streams', function() { var topic = 'hub/led/1234/state'; var data = null; ws.on('open', function() { - var msg = { action: 'subscribe', topic: topic, limit: 10 }; + var msg = { type: 'subscribe', topic: topic, limit: 10 }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); @@ -241,7 +242,7 @@ describe('Event Streams', function() { var topic = 'hub/led/1234/state'; var data = null; ws.on('open', function() { - var msg = { action: 'subscribe', topic: topic, limit: 10 }; + var msg = { type: 'subscribe', topic: topic, limit: 10 }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); From 53334754bd5445a6029dd669308ebb7ef1bd5e60 Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Fri, 2 Oct 2015 16:10:15 -0400 Subject: [PATCH 08/26] StreamTopic accepts any topic string, PubSub converts topic string to StreamTopic class which is returned in callback. --- lib/event_broker.js | 45 +++++++++----- lib/event_socket.js | 5 +- lib/pubsub_service.js | 120 ++++++++++++++++++++----------------- lib/stream_topic.js | 46 +++++++++++--- test/test_api.js | 2 +- test/test_driver.js | 1 + test/test_event_broker.js | 3 - test/test_event_streams.js | 19 +++++- test/test_stream_topic.js | 17 ++++++ 9 files changed, 173 insertions(+), 85 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index ca72ba6..e2174c8 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -70,12 +70,27 @@ EventBroker.prototype.client = function(client) { return stillValid && client.ws === cl.ws; }); - if (c.length > 0) { - return; - } + if (client.streamEnabled) { + function generateQuery(topic) { + return { + name: topic.serverName, + topic: topic.deviceType + '/' + topic.deviceId + '/' + topic.streamName, + }; + } + client.on('subscribe', function(subscription) { + var query = generateQuery(subscription.topic); + query.subscriptionId = subscription.subscriptionId; + self._subscribe(query); + client.query.push(query); + }); + client.on('unsubscribe', function(subscription) { +// var query = generateQuery(subscription.topic); +// self._unsubscribe(query); + }); + } + this.clients.push(client); - client.on('close', function() { client.query.forEach(self._unsubscribe.bind(self)); var idx = self.clients.indexOf(client); @@ -110,7 +125,6 @@ EventBroker.prototype._subscribe = function(query) { this.subscriptions[topic].count++; } else { - if (!this._peerSubscriptions[query.name]) { this._peerSubscriptions[query.name] = {}; } @@ -172,19 +186,22 @@ EventBroker.prototype._unsubscribe = function(query) { } }; - EventBroker.prototype._publish = function(topic, data) { this.clients.forEach(function(client) { client.query.forEach(function(query) { - if (!topicMatch(topic, query.topic)) { - return; - } - - client.send(topic, data, function(err){ - if (err) { - console.error('ws error: '+err); + if (topic.match(query.topic)) { + if (client.streamEnabled) { + data.type = 'event'; + data.topic = query.name + '/' + data.topic; + data.subscriptionId = query.subscriptionId; } - }); + + client.send(topic, data, function(err){ + if (err) { + console.error('ws error: '+err); + } + }); + } }); }); }; diff --git a/lib/event_socket.js b/lib/event_socket.js index 9681570..c0d2571 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -16,7 +16,8 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { // list of event streams this._subscriptions = []; this._subscriptionIndex = 0; - + this.streamEnabled = !!(streamEnabled); + // only setup parser when using event stream if (streamEnabled) { var self = this; @@ -143,7 +144,7 @@ EventSocket.prototype.onClose = function() { EventSocket.prototype.init = function() { var self = this; this.ws.on('message', function(buffer) { - if (self._parser) { + if (self.streamEnabled) { self._parser.add(buffer); } }); diff --git a/lib/pubsub_service.js b/lib/pubsub_service.js index f1c0063..dd40757 100644 --- a/lib/pubsub_service.js +++ b/lib/pubsub_service.js @@ -1,4 +1,5 @@ var EventEmitter = require('events').EventEmitter; +var StreamTopic = require('./stream_topic'); var deviceFormatter = require('./api_formats/siren/device.siren'); var PubSub = module.exports = function() { @@ -9,33 +10,45 @@ var PubSub = module.exports = function() { PubSub.prototype.publish = function(topic, data) { var x = decodeURIComponent(topic); this.emitter.emit(x, data); + this.emitter.emit('_data', x, data); }; PubSub.prototype.subscribe = function(topic, callback) { - var f = null; - if (typeof callback === 'function') { - f = this._onCallback(topic, callback); - this.emitter.on(topic, f); - } else if (typeof callback === 'object') { - f = this._onResponse(topic, callback); - this.emitter.on(topic, f); - } else { - return; + var self = this; + if (typeof topic === 'string') { + topic = StreamTopic.parse(topic); } + + var f = function(t, data) { + if (topic.match(t)) { + if (typeof callback === 'function') { + self._onCallback(topic, data, callback); + } else if (typeof callback === 'object') { + self._onResponse(topic, data, callback); + } + } + }; + + this.emitter.on('_data', f); - if (!this._listeners[topic]) { - this._listeners[topic] = []; + if (!this._listeners[topic.hash()]) { + this._listeners[topic.hash()] = []; } - - this._listeners[topic].push({ listener: callback, actual: f }); + + this._listeners[topic.hash()].push({ listener: callback, actual: f }); }; PubSub.prototype.unsubscribe = function(topic, listener) { - if (!this._listeners[topic]) { + if (typeof topic === 'string') { + topic = StreamTopic.parse(topic); + } + + if (!this._listeners[topic.hash()]) { return; } + var found = -1; - this._listeners[topic].some(function(l, idx) { + this._listeners[topic.hash()].some(function(l, idx) { if (l.listener === listener) { found = idx; return true; @@ -50,59 +63,54 @@ PubSub.prototype.unsubscribe = function(topic, listener) { listener.response.end(); // end response for push request } - this.emitter.removeListener(topic, this._listeners[topic][found].actual); - this._listeners[topic].splice(found, 1); + this.emitter.removeListener('_data', this._listeners[topic.hash()][found].actual); + this._listeners[topic.hash()].splice(found, 1); - if (this._listeners[topic].length === 0) { - delete this._listeners[topic]; + if (this._listeners[topic.hash()].length === 0) { + delete this._listeners[topic.hash()]; } - }; -PubSub.prototype._onCallback = function(topic, cb) { +PubSub.prototype._onCallback = function(topic, data, cb) { var self = this; - return function(data, options) { - cb(topic, data); - }; + cb(topic, data); }; -PubSub.prototype._onResponse = function(topic, env) { +PubSub.prototype._onResponse = function(topic, data, env) { var self = this; - return function(data) { - var encoding = ''; - if(Buffer.isBuffer(data)) { - encoding = 'application/octet-stream'; - } else if (data.query && data.device) { - var serverId = env.route.params.serverId; - var loader = { path: '/servers/' + encodeURIComponent(serverId) }; - data = deviceFormatter({ loader: loader, env: env, model: data.device }); + var encoding = ''; + if(Buffer.isBuffer(data)) { + encoding = 'application/octet-stream'; + } else if (data.query && data.device) { + var serverId = env.route.params.serverId; + var loader = { path: '/servers/' + encodeURIComponent(serverId) }; + data = deviceFormatter({ loader: loader, env: env, model: data.device }); + data = new Buffer(JSON.stringify(data)); + } else if (typeof data == 'object') { + encoding = 'application/json'; + try { data = new Buffer(JSON.stringify(data)); - } else if (typeof data == 'object') { - encoding = 'application/json'; - try { - data = new Buffer(JSON.stringify(data)); - } catch (err) { - console.error(err, err.stack); - return; - } - } else { - console.error('PubSub._onResponse encoding not set.'); + } catch (err) { + console.error(err, err.stack); + return; } + } else { + console.error('PubSub._onResponse encoding not set.'); + } - var stream = env.response.push('/' + topic, { 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io', - 'Content-Length': data.length, - 'Content-Type': encoding - }); + var stream = env.response.push('/' + topic.hash(), { 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io', + 'Content-Length': data.length, + 'Content-Type': encoding + }); - stream.on('error', function(err) { - if (err.code === 'RST_STREAM' && err.status === 3) { - stream.end(); - } else { - console.error('PubSub._onCallback', err); - } - }); + stream.on('error', function(err) { + if (err.code === 'RST_STREAM' && err.status === 3) { + stream.end(); + } else { + console.error('PubSub._onCallback', err); + } + }); - stream.end(data); - }; + stream.end(data); }; diff --git a/lib/stream_topic.js b/lib/stream_topic.js index 9a5841d..76d1eec 100644 --- a/lib/stream_topic.js +++ b/lib/stream_topic.js @@ -1,8 +1,16 @@ var StreamTopic = module.exports = function() { - + this.serverName = null; + this.deviceType = null; + this.deviceId = null; + this.streamName = null; + this.streamQuery = null; + this._original = null; + this._useComponents = false; } StreamTopic.prototype.parse = function(topicString){ + this._original = topicString; + var previousCharacter = null; var currentCharacter = null; var start = 0; @@ -17,18 +25,30 @@ StreamTopic.prototype.parse = function(topicString){ } } - if(topicComponents.length !== 4) { - throw new Error('Topic Parse Error'); + if (topicComponents.length < 3) { + return; + } + + if (topicComponents.length === 3) { + topicComponents.unshift(null); } function checkForRegex(s) { - if(s[0] === '{' && s[s.length - 1] === '}') { - return new RegExp(s.slice(1, -1)); - } else { - return s; + if (typeof s === 'string') { + if(s[0] === '{' && s[s.length - 1] === '}') { + return new RegExp(s.slice(1, -1)); + } else { + return s; + } } } + // led/123/state + // _peer/connect + // _peer/disconnect + // query:asdasd + // query/asd + this._useComponents = true; this.serverName = checkForRegex(topicComponents[0]); this.deviceType = checkForRegex(topicComponents[1]); this.deviceId = checkForRegex(topicComponents[2]); @@ -37,6 +57,18 @@ StreamTopic.prototype.parse = function(topicString){ this.streamQuery = streamComponents[1]; } +StreamTopic.prototype.hash = function() { + return this._original; +}; + +StreamTopic.prototype.match = function(topic) { + if (!this._useComponents) { + return topic === this._original; + } else { + return topic === this._original; + } +}; + StreamTopic.parse = function(topicString) { var topic = new StreamTopic(); topic.parse(topicString); diff --git a/test/test_api.js b/test/test_api.js index d00b135..767efc1 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -338,7 +338,7 @@ describe('Zetta Api', function() { request(getHttpServer(app)) .get('/') .expect(getBody(function(res, body) { - assert.equal(body.links.length, 3); + assert.equal(body.links.length, 4); hasLinkRel(body.links, 'self'); })) .end(done) diff --git a/test/test_driver.js b/test/test_driver.js index 2f3be5c..4a3d306 100644 --- a/test/test_driver.js +++ b/test/test_driver.js @@ -158,6 +158,7 @@ describe('Driver', function() { var recv = 0; pubsub.subscribe(topic, function(topic, msg) { + console.log(topic, msg) assert.ok(msg.timestamp); assert.ok(msg.topic); assert.ok(!msg.data); diff --git a/test/test_event_broker.js b/test/test_event_broker.js index d06d4d1..372da20 100644 --- a/test/test_event_broker.js +++ b/test/test_event_broker.js @@ -112,9 +112,6 @@ describe('EventBroker', function() { clientA.emit('close'); - done(); - return; - setTimeout(function() { assert.equal(recievedA, 1); assert.equal(recievedB, 2); diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 6edc8e7..13bd0c7 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -8,16 +8,27 @@ describe('Event Streams', function() { var cluster = null; var urls = []; var baseUrl = '/events'; + var devices = []; + var validTopics = []; beforeEach(function(done) { urls = []; + devices = []; + validTopics = []; cluster = zettacluster({ zetta: zetta }) .server('cloud') - .server('hub', [Driver], ['cloud']) + .server('hub', [Driver, Driver], ['cloud']) .on('ready', function() { app = cluster.servers['cloud']; urls.push('localhost:' + cluster.servers['cloud']._testPort); urls.push('localhost:' + cluster.servers['hub']._testPort); + + Object.keys(cluster.servers['hub'].runtime._jsDevices).forEach(function(id) { + var device = cluster.servers['hub'].runtime._jsDevices[id]; + devices.push(device); + validTopics.push('hub/' + device.type + '/' + device.id + '/state'); + }); + done(); }) .run(function(err){ @@ -83,7 +94,7 @@ describe('Event Streams', function() { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); var subscriptionId = null; - var topic = 'hub/led/1234/state'; + var topic = validTopics[0]; ws.on('open', function() { var msg = { type: 'subscribe', topic: topic }; ws.send(JSON.stringify(msg)); @@ -95,6 +106,10 @@ describe('Event Streams', function() { assert.equal(json.topic, topic); assert(json.subscriptionId); subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + }, 50); } else { assert.equal(json.type, 'event'); assert(json.timestamp); diff --git a/test/test_stream_topic.js b/test/test_stream_topic.js index 7934729..192d2cf 100644 --- a/test/test_stream_topic.js +++ b/test/test_stream_topic.js @@ -29,4 +29,21 @@ describe('Stream Topic', function() { assert.equal(t.streamName, 'state'); assert.equal(t.streamQuery, 'select * where data > 80'); }); + + it('will correctly parse topics without the leading server name', function() { + var t = new StreamTopic(); + t.parse('led/1234/state'); + assert.equal(t.serverName, null); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + }) + + it('hash() will return the original input', function() { + var t = new StreamTopic(); + var topic = '{^Det.+$}/led/1234/state?select * where data > 80'; + t.parse(topic); + assert.equal(t.hash(), topic); + }) + }); From 4ad5d8787a1d9c95e59f6b8ded5901312545bb6d Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Tue, 6 Oct 2015 11:27:01 -0400 Subject: [PATCH 09/26] Steam topics use limit parameter --- lib/event_broker.js | 19 ++++++++++-- lib/event_socket.js | 63 ++++++++++++++++++++++---------------- test/test_event_streams.js | 32 ++++++++++++++----- 3 files changed, 78 insertions(+), 36 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index e2174c8..17d1708 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -81,12 +81,16 @@ EventBroker.prototype.client = function(client) { client.on('subscribe', function(subscription) { var query = generateQuery(subscription.topic); query.subscriptionId = subscription.subscriptionId; + query.limit = subscription.limit; + query.count = 0; + self._subscribe(query); client.query.push(query); }); + client.on('unsubscribe', function(subscription) { -// var query = generateQuery(subscription.topic); -// self._unsubscribe(query); + var query = generateQuery(subscription.topic); + self._unsubscribe(query); }); } @@ -148,6 +152,7 @@ EventBroker.prototype._unsubscribe = function(query) { if (this.subscriptions[topic].count > 0) { return; } + // unsubscribe locally this.zetta.pubsub.unsubscribe(topic, this.subscriptions[topic].listener); delete this.subscriptions[topic]; @@ -187,15 +192,25 @@ EventBroker.prototype._unsubscribe = function(query) { }; EventBroker.prototype._publish = function(topic, data) { + var self = this; this.clients.forEach(function(client) { client.query.forEach(function(query) { if (topic.match(query.topic)) { + if (client.streamEnabled) { data.type = 'event'; data.topic = query.name + '/' + data.topic; data.subscriptionId = query.subscriptionId; } + query.count++; + if (typeof query.limit === 'number' && query.count > query.limit) { + // unsubscribe broker + self._unsubscribe(query); + client._unsubscribe(query.subscriptionId) + return; + } + client.send(topic, data, function(err){ if (err) { console.error('ws error: '+err); diff --git a/lib/event_socket.js b/lib/event_socket.js index c0d2571..2792767 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -63,34 +63,11 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { }); this._parser.on('unsubscribe', function(msg) { - var foundIdx = -1; - self._subscriptions.some(function(subscription, idx) { - if(subscription.subscriptionId === msg.subscriptionId) { - foundIdx = idx; - return true; + self._unsubscribe(msg.subscriptionId, function(err, subscription) { + if (subscription) { + self.emit('unsubscribe', subscription); } }); - - if (foundIdx < 0) { - var msg = { - type: 'error', - code: 405, - timestamp: new Date().getTime(), - message: new Error('Unable to unsubscribe from invalid subscriptionId') - }; - self.ws.send(JSON.stringify(msg)); - return; - } - - var subscription = self._subscriptions.splice(foundIdx, 1)[0]; - var msg = { - type: 'unsubscribe-ack', - timestamp: new Date().getTime(), - subscriptionId: subscription.subscriptionId - }; - - self.emit('unsubscribe', subscription); - self.ws.send(JSON.stringify(msg)); }); } else { if (!Array.isArray(query)) { @@ -103,6 +80,40 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { }; util.inherits(EventSocket, EventEmitter); +EventSocket.prototype._unsubscribe = function(subscriptionId, cb) { + var self = this; + var foundIdx = -1; + self._subscriptions.some(function(subscription, idx) { + if(subscription.subscriptionId === subscriptionId) { + foundIdx = idx; + return true; + } + }); + + if (foundIdx < 0) { + var msg = { + type: 'error', + code: 405, + timestamp: new Date().getTime(), + message: new Error('Unable to unsubscribe from invalid subscriptionId') + }; + self.ws.send(JSON.stringify(msg)); + return; + } + + var subscription = self._subscriptions.splice(foundIdx, 1)[0]; + var msg = { + type: 'unsubscribe-ack', + timestamp: new Date().getTime(), + subscriptionId: subscription.subscriptionId + }; + + self.ws.send(JSON.stringify(msg)); + if (typeof cb === 'function') { + cb(null, subscription); + } +}; + EventSocket.prototype.send = function(topic, data) { if (!Buffer.isBuffer(data) && typeof data === 'object') { if (data['transitions']) { diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 13bd0c7..8c5e4fe 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -220,7 +220,7 @@ describe('Event Streams', function() { var ws = new WebSocket('ws://' + endpoint + baseUrl); var subscriptionId = null; var count = 0; - var topic = 'hub/led/1234/state'; + var topic = validTopics[0]; var data = null; ws.on('open', function() { var msg = { type: 'subscribe', topic: topic, limit: 10 }; @@ -233,15 +233,25 @@ describe('Event Streams', function() { assert(json.topic); assert(json.subscriptionId); subscriptionId = json.subscriptionId; - } else { + + setTimeout(function() { + for(var i=0; i<11; i++) { + devices[0].call((i % 2 === 0) ? 'change' : 'prepare'); + } + }, 50); + } else if (json.type !== 'unsubscribe-ack') { assert.equal(json.type, 'event'); assert(json.timestamp); assert(json.topic); assert(json.subscriptionId, subscriptionId); - assert.equal(json.data); + assert(json.data); + count++; if(count === 10) { - done(); + setTimeout(function() { + assert.equal(count, 10); + done(); + }, 200) } } }); @@ -254,7 +264,7 @@ describe('Event Streams', function() { var ws = new WebSocket('ws://' + endpoint + baseUrl); var subscriptionId = null; var count = 0; - var topic = 'hub/led/1234/state'; + var topic = validTopics[0]; var data = null; ws.on('open', function() { var msg = { type: 'subscribe', topic: topic, limit: 10 }; @@ -267,18 +277,24 @@ describe('Event Streams', function() { assert(json.topic); assert(json.subscriptionId); subscriptionId = json.subscriptionId; + setTimeout(function() { + for(var i=0; i<11; i++) { + devices[0].call((i % 2 === 0) ? 'change' : 'prepare'); + } + }, 50); } else if(json.type === 'event') { assert.equal(json.type, 'event'); assert(json.timestamp); assert(json.topic); assert(json.subscriptionId, subscriptionId); - assert.equal(json.data); + assert(json.data); count++; } else if(json.type === 'unsubscribe-ack') { assert.equal(json.type, 'unsubscribe-ack'); - assert(timestamp); + assert(json.timestamp); assert.equal(json.subscriptionId, subscriptionId); - done(); + assert.equal(count, 10); + done(); } }); }); From 70ba640d30615c16d1a1f9c15da7e69890d5b9dc Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Tue, 6 Oct 2015 11:47:12 -0400 Subject: [PATCH 10/26] Added pubsubIdentifier to StreamTopic --- lib/event_broker.js | 2 +- lib/stream_topic.js | 32 +++++++++++++++++++++++++------- test/test_driver.js | 1 - 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index 17d1708..21178fa 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -75,7 +75,7 @@ EventBroker.prototype.client = function(client) { function generateQuery(topic) { return { name: topic.serverName, - topic: topic.deviceType + '/' + topic.deviceId + '/' + topic.streamName, + topic: topic.pubsubIdentifier() }; } client.on('subscribe', function(subscription) { diff --git a/lib/stream_topic.js b/lib/stream_topic.js index 76d1eec..29e0900 100644 --- a/lib/stream_topic.js +++ b/lib/stream_topic.js @@ -16,13 +16,14 @@ StreamTopic.prototype.parse = function(topicString){ var start = 0; var topicComponents = []; for(var i = 0; i < topicString.length; i++) { - currentCharacter = topicString[i]; - if(currentCharacter === '/' && previousCharacter !== '\\') { - topicComponents.push(topicString.slice(start, i)); - start = i + 1; - } else if(i === topicString.length - 1) { - topicComponents.push(topicString.slice(start, topicString.length)); - } + currentCharacter = topicString[i]; + if(currentCharacter === '/' && previousCharacter !== '\\') { + topicComponents.push(topicString.slice(start, i)); + start = i + 1; + } else if(i === topicString.length - 1) { + topicComponents.push(topicString.slice(start, topicString.length)); + } + previousCharacter = currentCharacter; } if (topicComponents.length < 3) { @@ -61,6 +62,23 @@ StreamTopic.prototype.hash = function() { return this._original; }; +StreamTopic.prototype.pubsubIdentifier = function() { + + function sanitizeRegex(part) { + if (part instanceof RegExp) { + return '{' + part.source + '}'; + } else { + return part; + } + } + + if (this._useComponents) { + return sanitizeRegex(this.deviceType) + '/' + sanitizeRegex(this.deviceId) + '/' + sanitizeRegex(this.streamName); + } else { + return this._original; + } +}; + StreamTopic.prototype.match = function(topic) { if (!this._useComponents) { return topic === this._original; diff --git a/test/test_driver.js b/test/test_driver.js index 4a3d306..2f3be5c 100644 --- a/test/test_driver.js +++ b/test/test_driver.js @@ -158,7 +158,6 @@ describe('Driver', function() { var recv = 0; pubsub.subscribe(topic, function(topic, msg) { - console.log(topic, msg) assert.ok(msg.timestamp); assert.ok(msg.topic); assert.ok(!msg.data); From 7d31a8f957c8c56df19f3e98f7cd89c5cd92ead1 Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Tue, 6 Oct 2015 13:32:45 -0400 Subject: [PATCH 11/26] Started to implement wildcard matching --- lib/stream_topic.js | 23 +++++++++++++++++++---- package.json | 1 + test/test_event_streams.js | 14 ++++++++++++-- test/test_stream_topic.js | 17 +++++++++++++++++ 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/lib/stream_topic.js b/lib/stream_topic.js index 29e0900..1ce7a1e 100644 --- a/lib/stream_topic.js +++ b/lib/stream_topic.js @@ -1,3 +1,5 @@ +var minimatch = require('minimatch'); + var StreamTopic = module.exports = function() { this.serverName = null; this.deviceType = null; @@ -79,11 +81,24 @@ StreamTopic.prototype.pubsubIdentifier = function() { } }; -StreamTopic.prototype.match = function(topic) { - if (!this._useComponents) { - return topic === this._original; +StreamTopic.prototype.match = function(topicString) { + if (this._useComponents) { + var components = [ this.serverName, this.deviceType, this.deviceId, this.streamName ]; + var checkTopic = StreamTopic.parse(topicString); + var checkComponents = [ checkTopic.serverName, checkTopic.deviceType, checkTopic.deviceId, checkTopic.streamName ]; + + return components.every(function(component, idx) { + if (component === undefined) { + return true; + } + if (component instanceof RegExp) { + return component.exec(checkComponents[idx]); + } else { + return minimatch(checkComponents[idx], component); + } + }); } else { - return topic === this._original; + return minimatch(topicString, this._original); } }; diff --git a/package.json b/package.json index fdc8547..8faa2fe 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "levelup": "^0.18.5", "medea": "^1.0.0", "medeadown": "^1.1.8", + "minimatch": "^3.0.0", "node-uuid": "^1.4.1", "revolt": "^0.7.0", "rx": "~2.2.20", diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 8c5e4fe..542e650 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -123,12 +123,14 @@ describe('Event Streams', function() { ws.on('error', done); }); - itBoth('wildcard topic receives all messages for all topics', function(idx, done) { + describe.only('tmp', function() { + itBoth('wildcard topic for single peer receives all messages for all topics', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); var subscriptionId = null; var count = 0; - var topic = 'hub/led/*/state'; + var topic = 'hub/testdriver/*/state'; + var lastTopic = null; ws.on('open', function() { var msg = { type: 'subscribe', topic: topic }; ws.send(JSON.stringify(msg)); @@ -140,10 +142,17 @@ describe('Event Streams', function() { assert.equal(json.topic, topic); assert(json.subscriptionId); subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + devices[1].call('change'); + }, 50); } else { assert.equal(json.type, 'event'); assert(json.timestamp); assert(json.topic); + assert.notEqual(json.topic, lastTopic); + lastTopic = json.topic; assert.equal(json.subscriptionId, subscriptionId); assert(json.data); count++; @@ -155,6 +164,7 @@ describe('Event Streams', function() { }); ws.on('error', done); }); +}) itBoth('topic that doesnt exist still opens stream', function(idx, done) { var endpoint = urls[idx]; diff --git a/test/test_stream_topic.js b/test/test_stream_topic.js index 192d2cf..a101abf 100644 --- a/test/test_stream_topic.js +++ b/test/test_stream_topic.js @@ -45,5 +45,22 @@ describe('Stream Topic', function() { t.parse(topic); assert.equal(t.hash(), topic); }) + + describe.only('.match()', function() { + + function matchTest(query, topic, eval) { + it('should return ' + eval + ' for query ' + query + ' on topic ' + topic, function() { + var t = StreamTopic.parse(query); + assert.equal(t.match(topic), eval); + }) + } + + matchTest('led/123/*', 'led/123/state', true); + matchTest('led/321/*', 'led/123/state', false); + matchTest('led/**', 'led/123/state', true); + matchTest('{^Det.+$}/led/123/state', 'Detroit-123/led/123/state', true); + matchTest('{^Det.+$}/led/123/state', 'hub/led/123/state', false); + matchTest('{^Det.+$}/led/**', 'Detroit-123/led/123/stream', true); + }) }); From a395885b5df46d7a153d611d11e4704366d5d833 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Wed, 7 Oct 2015 09:43:57 -0400 Subject: [PATCH 12/26] Topic matching on stream messages. --- lib/event_broker.js | 6 +++- lib/pubsub_service.js | 1 - lib/stream_topic.js | 73 +++++++++++++++++++++++++++++++------- test/test_event_streams.js | 2 -- test/test_stream_topic.js | 7 ++-- 5 files changed, 70 insertions(+), 19 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index 21178fa..0b76047 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -1,4 +1,5 @@ var querytopic = require('./query_topic'); +var StreamTopic = require('./stream_topic'); function topicMatch(a, b) { return a === b; @@ -38,7 +39,10 @@ EventBroker.prototype._setupPublishListener = function(peer, topic) { } this._publishListeners[peer.name][topic] = function(data) { - self._publish(topic, data); + streamTopic = StreamTopic.parse(topic); + + + self._publish(streamTopic, data); }; peer.on(topic, this._publishListeners[peer.name][topic]); diff --git a/lib/pubsub_service.js b/lib/pubsub_service.js index dd40757..3cf07dc 100644 --- a/lib/pubsub_service.js +++ b/lib/pubsub_service.js @@ -97,7 +97,6 @@ PubSub.prototype._onResponse = function(topic, data, env) { } else { console.error('PubSub._onResponse encoding not set.'); } - var stream = env.response.push('/' + topic.hash(), { 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io', 'Content-Length': data.length, 'Content-Type': encoding diff --git a/lib/stream_topic.js b/lib/stream_topic.js index 1ce7a1e..d38796f 100644 --- a/lib/stream_topic.js +++ b/lib/stream_topic.js @@ -28,7 +28,7 @@ StreamTopic.prototype.parse = function(topicString){ previousCharacter = currentCharacter; } - if (topicComponents.length < 3) { + if (topicComponents.length < 3 && topicComponents.indexOf('**') === -1) { return; } @@ -55,9 +55,13 @@ StreamTopic.prototype.parse = function(topicString){ this.serverName = checkForRegex(topicComponents[0]); this.deviceType = checkForRegex(topicComponents[1]); this.deviceId = checkForRegex(topicComponents[2]); - var streamComponents = topicComponents[3].split('?'); - this.streamName = checkForRegex(streamComponents[0]); - this.streamQuery = streamComponents[1]; + if(topicComponents[3]) { + var streamComponents = topicComponents[3].split('?'); + this.streamName = checkForRegex(streamComponents[0]); + this.streamQuery = streamComponents[1]; + } else { + this.streamName = undefined; + } } StreamTopic.prototype.hash = function() { @@ -83,20 +87,63 @@ StreamTopic.prototype.pubsubIdentifier = function() { StreamTopic.prototype.match = function(topicString) { if (this._useComponents) { - var components = [ this.serverName, this.deviceType, this.deviceId, this.streamName ]; + var components = [ this.serverName, this.deviceType, this.deviceId, this.streamName ].filter(function(i) { return i !== undefined });; + var checkedComponents = []; + var topicStringComponents = []; var checkTopic = StreamTopic.parse(topicString); - var checkComponents = [ checkTopic.serverName, checkTopic.deviceType, checkTopic.deviceId, checkTopic.streamName ]; + var checkComponents = [ checkTopic.serverName, checkTopic.deviceType, checkTopic.deviceId, checkTopic.streamName ].filter(function(i) { return i !== undefined }); + var matchStart = null; + + //{^Det.+$}/led/** + //[RegExp, String] + //['Detroit-123', 'led/123/state'] + //RegExp -> 'Detroit-123' && String -> 'led/123/state' - return components.every(function(component, idx) { - if (component === undefined) { - return true; - } + + //{^Det.+$}/led/*/{^sta.+$} + //[RegExp, String, RegExp] + //['Detroit-123', 'led/123', 'state'] + //RegExp -> 'Detroit-123' && String -> 'led/123/state' && RegExp -> 'state' + + //{^Det.+$}/**/{^stream.+$} + //[RegExp, String, RegExp] + //['Detroit-123', 'led/123', 'stream-123'] + + components.forEach(function(component, idx) { if (component instanceof RegExp) { - return component.exec(checkComponents[idx]); - } else { - return minimatch(checkComponents[idx], component); + if(matchStart !== null) { + var checkedComponent = components.slice(matchStart, idx).join('/'); + checkedComponents.push(checkedComponent); + if(checkedComponent === '**' && components.length < 4) { + topicStringComponents.push(checkComponents.slice(matchStart, idx + 1).join('/')); + topicStringComponents.push(checkComponents[idx + 1]); + return; + } else { + topicStringComponents.push(checkComponents.slice(matchStart, idx).join('/')); + } + matchStart = null; + } + checkedComponents.push(component); + topicStringComponents.push(checkComponents[idx]); + } else if(component !== undefined) { + if(matchStart === null) { + matchStart = idx; + } + if(idx === components.length - 1) { + checkedComponents.push(components.slice(matchStart).join('/')); + topicStringComponents.push(checkComponents.slice(matchStart).join('/')); + } } }); + + return checkedComponents.every(function(component, idx) { + var topicComponent = topicStringComponents[idx]; + if(component instanceof RegExp) { + return component.exec(topicComponent); + } else { + return minimatch(topicComponent, component); + } + }); } else { return minimatch(topicString, this._original); } diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 542e650..322c2fe 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -123,7 +123,6 @@ describe('Event Streams', function() { ws.on('error', done); }); - describe.only('tmp', function() { itBoth('wildcard topic for single peer receives all messages for all topics', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); @@ -164,7 +163,6 @@ describe('Event Streams', function() { }); ws.on('error', done); }); -}) itBoth('topic that doesnt exist still opens stream', function(idx, done) { var endpoint = urls[idx]; diff --git a/test/test_stream_topic.js b/test/test_stream_topic.js index a101abf..40059f9 100644 --- a/test/test_stream_topic.js +++ b/test/test_stream_topic.js @@ -46,7 +46,7 @@ describe('Stream Topic', function() { assert.equal(t.hash(), topic); }) - describe.only('.match()', function() { + describe('.match()', function() { function matchTest(query, topic, eval) { it('should return ' + eval + ' for query ' + query + ' on topic ' + topic, function() { @@ -61,6 +61,9 @@ describe('Stream Topic', function() { matchTest('{^Det.+$}/led/123/state', 'Detroit-123/led/123/state', true); matchTest('{^Det.+$}/led/123/state', 'hub/led/123/state', false); matchTest('{^Det.+$}/led/**', 'Detroit-123/led/123/stream', true); - }) + matchTest('{^Det.+$}/led/123/{^stream.+$}', 'Detroit-123/led/123/stream-123', true); + matchTest('{^Det.+$}/**/{^stream.+$}', 'Detroit-123/led/123/stream-123', true); + matchTest('{^Det.+$}/**', 'Detroit-123/led/123/stream', true); + }); }); From 4c1c9edd9086e46666d6c1e3688a8f345ad8134b Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Wed, 7 Oct 2015 11:04:51 -0400 Subject: [PATCH 13/26] Progress on wild card server topics. --- lib/event_broker.js | 65 ++++++++++++++++++++++++++++++++------ test/test_event_streams.js | 47 ++++++++++++++++++++++++--- 2 files changed, 98 insertions(+), 14 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index 0b76047..4e4a4e4 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -42,7 +42,7 @@ EventBroker.prototype._setupPublishListener = function(peer, topic) { streamTopic = StreamTopic.parse(topic); - self._publish(streamTopic, data); + self._publish(streamTopic, data, peer.name); }; peer.on(topic, this._publishListeners[peer.name][topic]); @@ -87,9 +87,47 @@ EventBroker.prototype.client = function(client) { query.subscriptionId = subscription.subscriptionId; query.limit = subscription.limit; query.count = 0; - - self._subscribe(query); client.query.push(query); + var connectedPeers = []; + + var subscribeToPeer = function(peerName) { + var copiedQuery = {}; + Object.keys(query).forEach(function(key) { + copiedQuery[key] = query[key]; + }); + + if(peerName) { + copiedQuery.name = peerName; + } + + if(connectedPeers.indexOf(copiedQuery.name) === -1) { + connectedPeers.push(copiedQuery.name); + + if(query.name instanceof RegExp && !query.name.exec(copiedQuery.name)) { + return; + } + self._subscribe(copiedQuery); + } + } + + if(query.name instanceof RegExp || query.name === '*') { + + self.zetta.pubsub.subscribe('_peer/connect', function(topic, data) { + subscribeToPeer(data.peer.name); + }); + + Object.keys(self.peers).forEach(subscribeToPeer); + console.log(self.zetta._name); + subscribeToPeer(self.zetta._name); + + } else { + subscribeToPeer(); + } + + //listen peer connect events + //iterate through current peers + //array of peers that have been given the topic + }); client.on('unsubscribe', function(subscription) { @@ -195,16 +233,22 @@ EventBroker.prototype._unsubscribe = function(query) { } }; -EventBroker.prototype._publish = function(topic, data) { +EventBroker.prototype._publish = function(topic, data, peerName) { var self = this; this.clients.forEach(function(client) { + console.log('topic', topic); client.query.forEach(function(query) { + console.log(query.topic); if (topic.match(query.topic)) { - + + var copiedData = {}; + Object.keys(data).forEach(function(key) { + copiedData[key] = data[key]; + }); if (client.streamEnabled) { - data.type = 'event'; - data.topic = query.name + '/' + data.topic; - data.subscriptionId = query.subscriptionId; + copiedData.type = 'event'; + copiedData.topic = peerName + '/' + data.topic; + copiedData.subscriptionId = query.subscriptionId; } query.count++; @@ -215,18 +259,19 @@ EventBroker.prototype._publish = function(topic, data) { return; } - client.send(topic, data, function(err){ + client.send(topic, copiedData, function(err){ if (err) { console.error('ws error: '+err); } }); + } }); }); }; EventBroker.prototype._onLocalPubsub = function(topic, data) { - this._publish(topic, data); + this._publish(topic, data, this.zetta._name); }; diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 322c2fe..77c1540 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -123,6 +123,42 @@ describe('Event Streams', function() { ws.on('error', done); }); + itBoth('wildcard server topic subscription only receives messages with that topic', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = validTopics[0]; + topic = topic.replace('hub', '*'); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + }, 50); + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + console.log(json); + assert.equal(json.topic, validTopics[0]); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + done(); + } + }); + }); + ws.on('error', done); + }); + + itBoth('wildcard topic for single peer receives all messages for all topics', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); @@ -189,9 +225,8 @@ describe('Event Streams', function() { var subscriptionId = null; var count = 0; var ackCount = 0; - var topicOne = 'hub/led/*/state'; - var topicTwo = 'hub/led/1234/state'; - var data = null; + var topicOne = validTopics[0]; + var topicTwo = 'hub/testdriver/*/state'; ws.on('open', function() { var msgOne = { type: 'subscribe', topic: topicOne }; var msgTwo = { type: 'subscribe', topic: topicTwo }; @@ -206,12 +241,16 @@ describe('Event Streams', function() { assert(json.subscriptionId); subscriptionId = json.subscriptionId; ackCount++; + setTimeout(function() { + for(var i=0; i<11; i++) { + devices[0].call((i % 2 === 0) ? 'change' : 'prepare'); + } + }, 50); } else { assert.equal(json.type, 'event'); assert(json.timestamp); assert(json.topic); assert(json.subscriptionId); - assert.equal(json.data, data); count++; if(count === 2) { assert.equal(ackCount, 2); From 7d50e16136f03837e1494dbd538f30be8c344fab Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Wed, 7 Oct 2015 11:53:02 -0400 Subject: [PATCH 14/26] event broker keeps the original topic, fixed domain test --- lib/event_broker.js | 23 +++++--------- test/test_event_streams.js | 63 +++++++++++++++++++++++++++++++++++++- test/test_zetta.js | 1 + 3 files changed, 70 insertions(+), 17 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index 4e4a4e4..1b9263e 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -110,16 +110,13 @@ EventBroker.prototype.client = function(client) { } } - if(query.name instanceof RegExp || query.name === '*') { - + if(query.name instanceof RegExp || query.name === '*') { self.zetta.pubsub.subscribe('_peer/connect', function(topic, data) { subscribeToPeer(data.peer.name); }); Object.keys(self.peers).forEach(subscribeToPeer); - console.log(self.zetta._name); subscribeToPeer(self.zetta._name); - } else { subscribeToPeer(); } @@ -235,20 +232,15 @@ EventBroker.prototype._unsubscribe = function(query) { EventBroker.prototype._publish = function(topic, data, peerName) { var self = this; + var originalTopic = data.topic; this.clients.forEach(function(client) { - console.log('topic', topic); client.query.forEach(function(query) { - console.log(query.topic); if (topic.match(query.topic)) { - - var copiedData = {}; - Object.keys(data).forEach(function(key) { - copiedData[key] = data[key]; - }); + if (client.streamEnabled) { - copiedData.type = 'event'; - copiedData.topic = peerName + '/' + data.topic; - copiedData.subscriptionId = query.subscriptionId; + data.type = 'event'; + data.topic = peerName + '/' + originalTopic; + data.subscriptionId = query.subscriptionId; } query.count++; @@ -259,12 +251,11 @@ EventBroker.prototype._publish = function(topic, data, peerName) { return; } - client.send(topic, copiedData, function(err){ + client.send(topic, data, function(err){ if (err) { console.error('ws error: '+err); } }); - } }); }); diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 77c1540..366569a 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -123,6 +123,68 @@ describe('Event Streams', function() { ws.on('error', done); }); + itBoth('multiple clients specific topic subscription only receives messages with that topic', function(idx, done) { + var endpoint = urls[idx]; + var topic = validTopics[0]; + + var connected = 0; + var recv = 0; + + var ws1 = new WebSocket('ws://' + endpoint + baseUrl); + ws1.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws1.send(JSON.stringify(msg)); + var subscriptionId = null; + ws1.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + connected++; + subscriptionId = json.subscriptionId; + if (connected === 2) { + setTimeout(function() { + devices[0].call('change'); + }, 50); + } + } else { + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + recv++; + if (recv === 2) { + done(); + } + } + }); + }); + ws1.on('error', done); + + var ws2 = new WebSocket('ws://' + endpoint + baseUrl); + ws2.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws2.send(JSON.stringify(msg)); + var subscriptionId = null; + ws2.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + subscriptionId = json.subscriptionId; + connected++; + if (connected === 2) { + setTimeout(function() { + devices[0].call('change'); + }, 50); + } + } else { + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + recv++; + if (recv === 2) { + done(); + } + } + }); + }); + ws2.on('error', done); + }); + itBoth('wildcard server topic subscription only receives messages with that topic', function(idx, done) { var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); @@ -147,7 +209,6 @@ describe('Event Streams', function() { } else { assert.equal(json.type, 'event'); assert(json.timestamp); - console.log(json); assert.equal(json.topic, validTopics[0]); assert.equal(json.subscriptionId, subscriptionId); assert(json.data); diff --git a/test/test_zetta.js b/test/test_zetta.js index 7aa54a7..d32ed94 100644 --- a/test/test_zetta.js +++ b/test/test_zetta.js @@ -42,6 +42,7 @@ describe('Zetta', function() { var d = require('domain').create(); d.on('error', function(err) { assert.equal(err.message, '123'); + d.dispose() done(); }); d.run(function() { From 52dfb6483133d7091848a01047e326dbca7f330d Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Wed, 7 Oct 2015 12:01:50 -0400 Subject: [PATCH 15/26] Added event stream test for multiple hubs --- test/test_event_streams.js | 55 +++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 366569a..1684331 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -18,17 +18,19 @@ describe('Event Streams', function() { cluster = zettacluster({ zetta: zetta }) .server('cloud') .server('hub', [Driver, Driver], ['cloud']) + .server('hub2', [Driver, Driver], ['cloud']) .on('ready', function() { app = cluster.servers['cloud']; urls.push('localhost:' + cluster.servers['cloud']._testPort); urls.push('localhost:' + cluster.servers['hub']._testPort); - Object.keys(cluster.servers['hub'].runtime._jsDevices).forEach(function(id) { - var device = cluster.servers['hub'].runtime._jsDevices[id]; - devices.push(device); - validTopics.push('hub/' + device.type + '/' + device.id + '/state'); - }); - + ['hub', 'hub2'].forEach(function(hubname) { + Object.keys(cluster.servers[hubname].runtime._jsDevices).forEach(function(id) { + var device = cluster.servers[hubname].runtime._jsDevices[id]; + devices.push(device); + validTopics.push(hubname + '/' + device.type + '/' + device.id + '/state'); + }); + }) done(); }) .run(function(err){ @@ -217,7 +219,46 @@ describe('Event Streams', function() { }); }); ws.on('error', done); - }); + }); + + + it('wildcard server topic subscription receives messages from both hubs', function(done) { + var endpoint = urls[0]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = '*/testdriver/*/state'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + var recv = 0; + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + devices[2].call('change'); + }, 50); + } else { + recv++; + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + if (recv === 2) { + done(); + } + } + }); + }); + ws.on('error', done); + }); itBoth('wildcard topic for single peer receives all messages for all topics', function(idx, done) { From f04af80ff9e0c1e92a37de48d220fd30d554d234 Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Mon, 12 Oct 2015 12:17:43 -0400 Subject: [PATCH 16/26] Updated streamtopic to create caql query from stream query. Eventbroker uses calypso-memory to for topic queries. --- lib/event_broker.js | 56 ++++++++++++++++++++++++-------------- lib/stream_topic.js | 6 +++- package.json | 1 + test/test_event_streams.js | 34 +++++++++++++++++++++++ 4 files changed, 76 insertions(+), 21 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index 1b9263e..167d3b2 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -1,10 +1,7 @@ +var MemorySession = require('calypso-memory/session'); var querytopic = require('./query_topic'); var StreamTopic = require('./stream_topic'); -function topicMatch(a, b) { - return a === b; -} - var EventBroker = module.exports = function(zetta) { this.peers = {}; this.zetta = zetta; @@ -15,6 +12,8 @@ var EventBroker = module.exports = function(zetta) { this._publishListeners = {}; // {: {: _listner } } this._deviceQueries = {}; + + this.caqlSession = MemorySession.create({ data: {} }); }; EventBroker.prototype.peer = function(peer) { @@ -87,6 +86,7 @@ EventBroker.prototype.client = function(client) { query.subscriptionId = subscription.subscriptionId; query.limit = subscription.limit; query.count = 0; + query.caql = subscription.topic.streamQuery; client.query.push(query); var connectedPeers = []; @@ -237,25 +237,41 @@ EventBroker.prototype._publish = function(topic, data, peerName) { client.query.forEach(function(query) { if (topic.match(query.topic)) { - if (client.streamEnabled) { - data.type = 'event'; - data.topic = peerName + '/' + originalTopic; - data.subscriptionId = query.subscriptionId; - } + var sendToClient = function() { - query.count++; - if (typeof query.limit === 'number' && query.count > query.limit) { - // unsubscribe broker - self._unsubscribe(query); - client._unsubscribe(query.subscriptionId) - return; - } + if (client.streamEnabled) { + data.type = 'event'; + data.topic = peerName + '/' + originalTopic; + data.subscriptionId = query.subscriptionId; - client.send(topic, data, function(err){ - if (err) { - console.error('ws error: '+err); + query.count++; + if (typeof query.limit === 'number' && query.count > query.limit) { + // unsubscribe broker + self._unsubscribe(query); + client._unsubscribe(query.subscriptionId) + return; + } } - }); + client.send(topic, data, function(err){ + if (err) { + console.error('ws error: '+err); + } + }); + }; + + if (client.streamEnabled) { + if (query.caql) { + self.caqlSession.data.data = { 0: data }; + self.caqlSession.find(query.caql, function(err, results) { + if (results.length === 1) { + data = results[0]; + sendToClient(); + } + }) + return; + } + } + sendToClient(); } }); }); diff --git a/lib/stream_topic.js b/lib/stream_topic.js index d38796f..2c605c9 100644 --- a/lib/stream_topic.js +++ b/lib/stream_topic.js @@ -1,4 +1,6 @@ var minimatch = require('minimatch'); +var calypso = require('calypso'); +var Query = calypso.Query; var StreamTopic = module.exports = function() { this.serverName = null; @@ -58,7 +60,9 @@ StreamTopic.prototype.parse = function(topicString){ if(topicComponents[3]) { var streamComponents = topicComponents[3].split('?'); this.streamName = checkForRegex(streamComponents[0]); - this.streamQuery = streamComponents[1]; + if (streamComponents[1]) { + this.streamQuery = Query.of('data').ql(streamComponents[1]); + } } else { this.streamName = undefined; } diff --git a/package.json b/package.json index 8faa2fe..71e776a 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "async": "^0.9.0", "calypso": "^1.0.0", "calypso-level": "^0.5.0", + "calypso-memory": "^0.4.0", "calypso-query-decompiler": "^0.4.0", "colors": "~0.6.2", "levelup": "^0.18.5", diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 1684331..bfea25c 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -450,6 +450,40 @@ describe('Event Streams', function() { ws.on('error', done); }); + itBoth('query field selector should only return properties in selection', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/bar?select data where data >= 1'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + setTimeout(function() { + devices[0].incrementStreamValue(); + }, 50); + } else if(json.type === 'event') { + console.log(json) + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data);; + done(); + } + }); + }); + ws.on('error', done); + }); + }); describe('SPDY API', function() { From 9186e2b4dcf890ed1e1ddab4908df3abe02400f8 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Mon, 12 Oct 2015 09:53:54 -0700 Subject: [PATCH 17/26] Select statements will only work on the value of the data property in stream message. Fixed tests accordingly. --- lib/event_broker.js | 4 ++-- test/test_event_streams.js | 3 +-- test/test_stream_topic.js | 3 ++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index 167d3b2..38ac7ce 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -261,10 +261,10 @@ EventBroker.prototype._publish = function(topic, data, peerName) { if (client.streamEnabled) { if (query.caql) { - self.caqlSession.data.data = { 0: data }; + self.caqlSession.data.data = { 0: { data: data.data} }; self.caqlSession.find(query.caql, function(err, results) { if (results.length === 1) { - data = results[0]; + data.data = results[0][Object.keys(results[0])[0]]; sendToClient(); } }) diff --git a/test/test_event_streams.js b/test/test_event_streams.js index bfea25c..c0cdeba 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -471,12 +471,11 @@ describe('Event Streams', function() { devices[0].incrementStreamValue(); }, 50); } else if(json.type === 'event') { - console.log(json) assert.equal(json.type, 'event'); assert(json.timestamp); assert(json.topic); assert(json.subscriptionId, subscriptionId); - assert(json.data);; + assert(json.data); done(); } }); diff --git a/test/test_stream_topic.js b/test/test_stream_topic.js index 40059f9..842fd9e 100644 --- a/test/test_stream_topic.js +++ b/test/test_stream_topic.js @@ -1,5 +1,6 @@ var StreamTopic = require('../lib/stream_topic'); var assert = require('assert'); +var Query = require('calypso').Query; describe('Stream Topic', function() { it('will correctly parse a topic of all valid strings', function() { @@ -27,7 +28,7 @@ describe('Stream Topic', function() { assert.equal(t.deviceType, 'led'); assert.equal(t.deviceId, '1234'); assert.equal(t.streamName, 'state'); - assert.equal(t.streamQuery, 'select * where data > 80'); + assert(t.streamQuery instanceof Query); }); it('will correctly parse topics without the leading server name', function() { From 9e293231ac434a2383e07121d418a9bc272c2c10 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Mon, 12 Oct 2015 13:12:29 -0700 Subject: [PATCH 18/26] Testing select statements and where queries. --- lib/event_broker.js | 3 +++ test/fixture/example_driver.js | 13 +++++++++++ test/test_event_streams.js | 40 ++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/lib/event_broker.js b/lib/event_broker.js index 38ac7ce..ad21c1f 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -262,7 +262,10 @@ EventBroker.prototype._publish = function(topic, data, peerName) { if (client.streamEnabled) { if (query.caql) { self.caqlSession.data.data = { 0: { data: data.data} }; + console.log('session ', self.caqlSession.data.data); + console.log('eb ',data.data); self.caqlSession.find(query.caql, function(err, results) { + console.log(arguments); if (results.length === 1) { data.data = results[0][Object.keys(results[0])[0]]; sendToClient(); diff --git a/test/fixture/example_driver.js b/test/fixture/example_driver.js index 8c9ed18..73c60fe 100644 --- a/test/fixture/example_driver.js +++ b/test/fixture/example_driver.js @@ -27,6 +27,7 @@ TestDriver.prototype.init = function(config) { .monitor('foo') .stream('bar', this.streamBar) .stream('foobar', this.streamFooBar, {binary: true}) + .stream('fooobject', this.streamObject) .map('test-number', function(x, cb) { cb(); }, [{ name: 'value', type: 'number'}]) .map('test-text', function(x, cb) { cb(); }, [{ name: 'value', type: 'text'}]) .map('test-none', function(x, cb) { cb(); }, [{ name: 'value'}]) @@ -48,6 +49,11 @@ TestDriver.prototype.prepare = function(cb) { cb(); }; +TestDriver.prototype.streamObject = function(stream) { + console.log('set stream object'); + this._streamObject = stream; +}; + TestDriver.prototype.returnError = function(error, cb) { cb(new Error(error)); }; @@ -59,6 +65,13 @@ TestDriver.prototype.incrementStreamValue = function() { } } +TestDriver.prototype.publishStreamObject = function(obj) { + if(this._streamObject) { + console.log('exists write : ', obj); + this._streamObject.write(obj); + } +}; + TestDriver.prototype.streamBar = function(stream) { this._stream = stream; } diff --git a/test/test_event_streams.js b/test/test_event_streams.js index c0cdeba..91dca60 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -483,6 +483,46 @@ describe('Event Streams', function() { ws.on('error', done); }); + describe.only('tmp', function() { + itBoth('query field selector * should only return properties in selection', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select * where data.data > 1'; + var data = {data: 2, foo: 'bar', quux: 2}; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + console.log(json); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + setTimeout(function() { + devices[0].publishStreamObject(data); + }, 50); + } else if(json.type === 'event') { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data); + assert.equal(json.data.data, 2); + assert.equal(json.data.foo, 'bar'); + assert.equal(json.data.quux, 1); + done(); + } + }); + }); + ws.on('error', done); + }); + }); + }); describe('SPDY API', function() { From d1c281f4cb5757c2c050126955de07bd0b79d0f9 Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Mon, 12 Oct 2015 17:05:45 -0400 Subject: [PATCH 19/26] Removed calypso, using caql-js-compiler for stream queries --- lib/event_broker.js | 33 +++++++++++------------ lib/stream_topic.js | 4 +-- package.json | 1 - test/fixture/example_driver.js | 2 -- test/test_event_streams.js | 48 +++++++++++++++++++++++++++------- test/test_stream_topic.js | 2 +- 6 files changed, 56 insertions(+), 34 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index ad21c1f..75e5760 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -1,4 +1,4 @@ -var MemorySession = require('calypso-memory/session'); +var JSCompiler = require('caql-js-compiler'); var querytopic = require('./query_topic'); var StreamTopic = require('./stream_topic'); @@ -13,7 +13,7 @@ var EventBroker = module.exports = function(zetta) { this._publishListeners = {}; // {: {: _listner } } this._deviceQueries = {}; - this.caqlSession = MemorySession.create({ data: {} }); + this._queryCache = {}; }; EventBroker.prototype.peer = function(peer) { @@ -230,9 +230,9 @@ EventBroker.prototype._unsubscribe = function(query) { } }; -EventBroker.prototype._publish = function(topic, data, peerName) { +EventBroker.prototype._publish = function(topic, msg, peerName) { var self = this; - var originalTopic = data.topic; + var originalTopic = msg.topic; this.clients.forEach(function(client) { client.query.forEach(function(query) { if (topic.match(query.topic)) { @@ -240,9 +240,9 @@ EventBroker.prototype._publish = function(topic, data, peerName) { var sendToClient = function() { if (client.streamEnabled) { - data.type = 'event'; - data.topic = peerName + '/' + originalTopic; - data.subscriptionId = query.subscriptionId; + msg.type = 'event'; + msg.topic = peerName + '/' + originalTopic; + msg.subscriptionId = query.subscriptionId; query.count++; if (typeof query.limit === 'number' && query.count > query.limit) { @@ -252,7 +252,7 @@ EventBroker.prototype._publish = function(topic, data, peerName) { return; } } - client.send(topic, data, function(err){ + client.send(topic, msg, function(err){ if (err) { console.error('ws error: '+err); } @@ -261,16 +261,13 @@ EventBroker.prototype._publish = function(topic, data, peerName) { if (client.streamEnabled) { if (query.caql) { - self.caqlSession.data.data = { 0: { data: data.data} }; - console.log('session ', self.caqlSession.data.data); - console.log('eb ',data.data); - self.caqlSession.find(query.caql, function(err, results) { - console.log(arguments); - if (results.length === 1) { - data.data = results[0][Object.keys(results[0])[0]]; - sendToClient(); - } - }) + var compiler = new JSCompiler(); + var compiled = compiler.compile(query.caql); + var result = compiled.filterOne({ data: msg.data }); + if (result) { + msg.data = result[Object.keys(result)[0]]; + sendToClient(); + } return; } } diff --git a/lib/stream_topic.js b/lib/stream_topic.js index 2c605c9..38368f4 100644 --- a/lib/stream_topic.js +++ b/lib/stream_topic.js @@ -1,6 +1,4 @@ var minimatch = require('minimatch'); -var calypso = require('calypso'); -var Query = calypso.Query; var StreamTopic = module.exports = function() { this.serverName = null; @@ -61,7 +59,7 @@ StreamTopic.prototype.parse = function(topicString){ var streamComponents = topicComponents[3].split('?'); this.streamName = checkForRegex(streamComponents[0]); if (streamComponents[1]) { - this.streamQuery = Query.of('data').ql(streamComponents[1]); + this.streamQuery = streamComponents[1]; } } else { this.streamName = undefined; diff --git a/package.json b/package.json index 71e776a..8faa2fe 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,6 @@ "async": "^0.9.0", "calypso": "^1.0.0", "calypso-level": "^0.5.0", - "calypso-memory": "^0.4.0", "calypso-query-decompiler": "^0.4.0", "colors": "~0.6.2", "levelup": "^0.18.5", diff --git a/test/fixture/example_driver.js b/test/fixture/example_driver.js index 73c60fe..87e4bf4 100644 --- a/test/fixture/example_driver.js +++ b/test/fixture/example_driver.js @@ -50,7 +50,6 @@ TestDriver.prototype.prepare = function(cb) { }; TestDriver.prototype.streamObject = function(stream) { - console.log('set stream object'); this._streamObject = stream; }; @@ -67,7 +66,6 @@ TestDriver.prototype.incrementStreamValue = function() { TestDriver.prototype.publishStreamObject = function(obj) { if(this._streamObject) { - console.log('exists write : ', obj); this._streamObject.write(obj); } }; diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 91dca60..1edc72b 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -483,20 +483,18 @@ describe('Event Streams', function() { ws.on('error', done); }); - describe.only('tmp', function() { - itBoth('query field selector * should only return properties in selection', function(idx, done){ + itBoth('query field selector * should all properties in selection', function(idx, done){ var endpoint = urls[idx]; var ws = new WebSocket('ws://' + endpoint + baseUrl); var subscriptionId = null; var count = 0; - var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select * where data.data > 1'; - var data = {data: 2, foo: 'bar', quux: 2}; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select * where data.val >= 2'; + var data = { foo: 'bar', val: 2 }; ws.on('open', function() { var msg = { type: 'subscribe', topic: topic }; ws.send(JSON.stringify(msg)); ws.on('message', function(buffer) { var json = JSON.parse(buffer); - console.log(json); if(json.type === 'subscribe-ack') { assert.equal(json.type, 'subscribe-ack'); assert(json.timestamp); @@ -507,20 +505,52 @@ describe('Event Streams', function() { devices[0].publishStreamObject(data); }, 50); } else if(json.type === 'event') { - assert.equal(json.type, 'event'); assert(json.timestamp); assert(json.topic); assert(json.subscriptionId, subscriptionId); assert(json.data); - assert.equal(json.data.data, 2); + assert.equal(json.data.val, 2); assert.equal(json.data.foo, 'bar'); - assert.equal(json.data.quux, 1); done(); } }); }); ws.on('error', done); - }); + }); + + itBoth('query field selector should return only selected properties', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select data.val'; + var data = { foo: 'bar', val: 2 }; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + setTimeout(function() { + devices[0].publishStreamObject(data); + }, 50); + } else if(json.type === 'event') { + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data); + assert.equal(json.data.val, 2); + assert.equal(json.data.foo, undefined); + done(); + } + }); + }); + ws.on('error', done); }); }); diff --git a/test/test_stream_topic.js b/test/test_stream_topic.js index 842fd9e..3b4fa34 100644 --- a/test/test_stream_topic.js +++ b/test/test_stream_topic.js @@ -28,7 +28,7 @@ describe('Stream Topic', function() { assert.equal(t.deviceType, 'led'); assert.equal(t.deviceId, '1234'); assert.equal(t.streamName, 'state'); - assert(t.streamQuery instanceof Query); + assert.equal(t.streamQuery, 'select * where data > 80'); }); it('will correctly parse topics without the leading server name', function() { From a40fe2a685c35fe7f7287be80dd1541507e3081c Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Mon, 12 Oct 2015 17:06:21 -0400 Subject: [PATCH 20/26] Added caql-js-compiler to package.json --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 8faa2fe..fae2663 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "calypso": "^1.0.0", "calypso-level": "^0.5.0", "calypso-query-decompiler": "^0.4.0", + "caql-js-compiler": "^0.5.0", "colors": "~0.6.2", "levelup": "^0.18.5", "medea": "^1.0.0", From a22db19c0dc5ad451aafa5f041d553fd016da853 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Mon, 12 Oct 2015 14:32:45 -0700 Subject: [PATCH 21/26] Cache compiled caql queries for your health. --- lib/event_broker.js | 28 +++++++++++----------------- lib/event_socket.js | 24 ++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index 75e5760..e2f45b9 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -233,12 +233,11 @@ EventBroker.prototype._unsubscribe = function(query) { EventBroker.prototype._publish = function(topic, msg, peerName) { var self = this; var originalTopic = msg.topic; + this.clients.forEach(function(client) { client.query.forEach(function(query) { if (topic.match(query.topic)) { - var sendToClient = function() { - if (client.streamEnabled) { msg.type = 'event'; msg.topic = peerName + '/' + originalTopic; @@ -251,27 +250,22 @@ EventBroker.prototype._publish = function(topic, msg, peerName) { client._unsubscribe(query.subscriptionId) return; } + if (query.caql) { + + var compiled = client._queryCache[query.caql]; + var result = compiled.filterOne({ data: msg.data }); + if (result) { + msg.data = result[Object.keys(result)[0]]; + } else { + return; + } + } } client.send(topic, msg, function(err){ if (err) { console.error('ws error: '+err); } }); - }; - - if (client.streamEnabled) { - if (query.caql) { - var compiler = new JSCompiler(); - var compiled = compiler.compile(query.caql); - var result = compiled.filterOne({ data: msg.data }); - if (result) { - msg.data = result[Object.keys(result)[0]]; - sendToClient(); - } - return; - } - } - sendToClient(); } }); }); diff --git a/lib/event_socket.js b/lib/event_socket.js index 2792767..17ed1c5 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -5,6 +5,7 @@ var EventStreamsParser = require('./event_streams_parser'); var StreamTopic = require('./stream_topic'); var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions; var deviceFormatter = require('./api_formats/siren/device.siren'); +var JSCompiler = require('caql-js-compiler'); //Flag to indicate that we expect data back on teh websocket //Tracking subscriptions @@ -12,6 +13,7 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { EventEmitter.call(this); this.ws = ws; this.query = []; + this._queryCache = {}; // list of event streams this._subscriptions = []; @@ -43,12 +45,30 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { code: 400, timestamp: new Date().getTime(), topic: msg.topic, - message: err + message: err.message }; self.ws.send(JSON.stringify(msg)); return; } + if(topic.streamQuery && !self._queryCache[topic.streamQuery]) { + try { + var compiler = new JSCompiler(); + var compiled = compiler.compile(topic.streamQuery); + self._queryCache[topic.streamQuery] = compiled; + } catch(err) { + var msg = { + type: 'error', + code: 400, + timestamp: new Date().getTime(), + topic: msg.topic, + message: err.message + } + self.ws.send(JSON.stringify(msg)); + return; + } + } + var subscription = { subscriptionId: ++self._subscriptionIndex, topic: topic, limit: msg.limit }; self._subscriptions.push(subscription); @@ -95,7 +115,7 @@ EventSocket.prototype._unsubscribe = function(subscriptionId, cb) { type: 'error', code: 405, timestamp: new Date().getTime(), - message: new Error('Unable to unsubscribe from invalid subscriptionId') + message: (new Error('Unable to unsubscribe from invalid subscriptionId')).message }; self.ws.send(JSON.stringify(msg)); return; From 59f12422023da285f855b37485adf2bac2abe48e Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Thu, 15 Oct 2015 12:46:02 -0400 Subject: [PATCH 22/26] Added tests for client errors --- test/test_event_streams.js | 104 +++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 1edc72b..50f1162 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -553,6 +553,110 @@ describe('Event Streams', function() { ws.on('error', done); }); + describe('Protocol Errors', function() { + + itBoth('invalid stream query should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?invalid stream query'; + var data = { foo: 'bar', val: 2 }; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('invalid subscribe should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject'; + ws.on('open', function() { + var msg = { type: 'subscribe' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('unsubscribing from an invalid subscriptionId should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + ws.on('open', function() { + var msg = { type: 'unsubscribe', subscriptionId: 123 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 405); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('unsubscribing from a missing subscriptionId should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + ws.on('open', function() { + var msg = { type: 'unsubscribe' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('on invalid message should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + ws.on('open', function() { + var msg = { test: 123 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + + }) + }); describe('SPDY API', function() { From 915be0dfbfcf98bbbbf72abd98adb7360061d907 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 22 Oct 2015 16:50:57 -0400 Subject: [PATCH 23/26] Updates to implementation to allow for _peer/connect and _peer/disconnect events to be listened for. --- lib/event_broker.js | 26 ++++-- lib/event_socket.js | 10 +++ lib/pubsub_service.js | 6 +- test/test_event_streams.js | 166 +++++++++++++++++++++++++++++++++++++ 4 files changed, 197 insertions(+), 11 deletions(-) diff --git a/lib/event_broker.js b/lib/event_broker.js index e2f45b9..ad7d42a 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -41,7 +41,7 @@ EventBroker.prototype._setupPublishListener = function(peer, topic) { streamTopic = StreamTopic.parse(topic); - self._publish(streamTopic, data, peer.name); + self._publish(streamTopic, null, data, peer.name); }; peer.on(topic, this._publishListeners[peer.name][topic]); @@ -108,6 +108,7 @@ EventBroker.prototype.client = function(client) { } self._subscribe(copiedQuery); } + } if(query.name instanceof RegExp || query.name === '*') { @@ -150,7 +151,7 @@ EventBroker.prototype._subscribe = function(query) { var topic = query.topic; // is local - if (query.name === this.zetta.id) { + if (query.name === this.zetta.id || !query.name) { if (!this.subscriptions[topic]) { this.subscriptions[topic] = { count: 0, listener: null }; } @@ -186,7 +187,7 @@ EventBroker.prototype._subscribe = function(query) { EventBroker.prototype._unsubscribe = function(query) { var topic = query.topic; - if (query.name === this.zetta.id) { + if (query.name === this.zetta.id || !query.name) { this.subscriptions[topic].count--; if (this.subscriptions[topic].count > 0) { return; @@ -230,17 +231,25 @@ EventBroker.prototype._unsubscribe = function(query) { } }; -EventBroker.prototype._publish = function(topic, msg, peerName) { +EventBroker.prototype._publish = function(topic, sourceTopic, msg, peerName) { var self = this; var originalTopic = msg.topic; - + this.clients.forEach(function(client) { client.query.forEach(function(query) { if (topic.match(query.topic)) { if (client.streamEnabled) { msg.type = 'event'; - msg.topic = peerName + '/' + originalTopic; + if(topic._useComponents) { + msg.topic = peerName + '/' + originalTopic; + } else { + if(topic._original.indexOf('*') === -1) { + msg.topic = topic._original; + } else { + msg.topic = sourceTopic; + } + } msg.subscriptionId = query.subscriptionId; query.count++; @@ -261,6 +270,7 @@ EventBroker.prototype._publish = function(topic, msg, peerName) { } } } + client.send(topic, msg, function(err){ if (err) { console.error('ws error: '+err); @@ -271,8 +281,8 @@ EventBroker.prototype._publish = function(topic, msg, peerName) { }); }; -EventBroker.prototype._onLocalPubsub = function(topic, data) { - this._publish(topic, data, this.zetta._name); +EventBroker.prototype._onLocalPubsub = function(topic, data, sourceTopic) { + this._publish(topic, sourceTopic, data, this.zetta._name); }; diff --git a/lib/event_socket.js b/lib/event_socket.js index 17ed1c5..fdbcfad 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -149,6 +149,16 @@ EventSocket.prototype.send = function(topic, data) { data = ObjectStream.format(topic, data.peer.properties()); } + if(Object.keys(data).length === 4 && typeof data.peer === 'object' && topic instanceof StreamTopic) { + data = { + data: data.peer.properties(), + subscriptionId: data.subscriptionId, + topic: data.topic, + timestamp: data.timestamp, + type: data.type + }; + } + try { data = JSON.stringify(data); } catch (err) { diff --git a/lib/pubsub_service.js b/lib/pubsub_service.js index 3cf07dc..4f28ce3 100644 --- a/lib/pubsub_service.js +++ b/lib/pubsub_service.js @@ -22,7 +22,7 @@ PubSub.prototype.subscribe = function(topic, callback) { var f = function(t, data) { if (topic.match(t)) { if (typeof callback === 'function') { - self._onCallback(topic, data, callback); + self._onCallback(topic, t, data, callback); } else if (typeof callback === 'object') { self._onResponse(topic, data, callback); } @@ -71,9 +71,9 @@ PubSub.prototype.unsubscribe = function(topic, listener) { } }; -PubSub.prototype._onCallback = function(topic, data, cb) { +PubSub.prototype._onCallback = function(topic, sourceTopic, data, cb) { var self = this; - cb(topic, data); + cb(topic, data, sourceTopic); }; PubSub.prototype._onResponse = function(topic, data, env) { diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 1edc72b..ab051ca 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -3,6 +3,172 @@ var WebSocket = require('ws'); var zetta = require('./..'); var zettacluster = require('zetta-cluster'); var Driver = require('./fixture/example_driver'); +var MemRegistry = require('./fixture/mem_registry'); +var MemPeerRegistry = require('./fixture/mem_peer_registry'); + +describe('Peering Event Streams', function() { + var cloud = null; + var cloudUrl = null; + var baseUrl = '/events'; + + beforeEach(function(done) { + cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + cloud.silent(); + cloud.listen(0, function(err) { + if(err) { + return done(err); + } + cloudUrl = 'http://localhost:' + cloud.httpServer.server.address().port; + done(); + }); + }); + + afterEach(function(done) { + cloud.httpServer.server.close(); + done(); + }); + + it('will receive a _peer/connect event when subscribed', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/connect' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/connect'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + assert.equal(json.topic, '_peer/connect'); + done(); + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); + + it('will receive a _peer/disconnect event when subscribed', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + var peer = data.peer; + peer.close(); + }); + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/disconnect' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/disconnect'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + assert.equal(json.topic, '_peer/disconnect'); + done(); + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); + + it('will receive a _peer/connect event when subscribed with wildcards', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + var peer = data.peer; + }); + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/*' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/*'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + assert.equal(json.topic, '_peer/connect'); + done(); + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); + it('will receive a _peer/connect and _peer/disconnect event when subscribed with wildcards', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + var peer = data.peer; + peer.close(); + }); + var recv = 0; + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/*' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/*'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + recv++; + if(recv == 1) { + assert.equal(json.topic, '_peer/connect'); + } else if(recv == 2) { + assert.equal(json.topic, '_peer/disconnect'); + done(); + } + + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); +}); describe('Event Streams', function() { var cluster = null; From 45c7fa143567ce739b9ec20008a8e5ecca981175 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Oct 2015 09:28:27 -0400 Subject: [PATCH 24/26] Updated to format the events link properly for SPDY connections. Added test to confirm appropriate API functionality. --- lib/api_resources/root.js | 9 +++++++- test/test_api.js | 43 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/lib/api_resources/root.js b/lib/api_resources/root.js index 1535e41..0cbe503 100644 --- a/lib/api_resources/root.js +++ b/lib/api_resources/root.js @@ -214,6 +214,13 @@ RootResource.prototype._queryDevices = function(env, next) { }; //This is called when ql and server isn't supplied RootResource.prototype._renderRoot = function(env, next) { + + var isForwardedProtocol = env.request.headers.hasOwnProperty('x-forwarded-proto') && ['http', 'https'].indexOf(env.request.headers['x-forwarded-proto']) !== -1; + var isSpdy = !!env.request.isSpdy && !isForwardedProtocol; + + + var eventsPath = env.helpers.url.path('/events'); + var wsEventsPath = isSpdy ? eventsPath : eventsPath.replace(/^http/, 'ws'); env.response.body = { class: ['root'], links: [ @@ -228,7 +235,7 @@ RootResource.prototype._renderRoot = function(env, next) { }, { rel: [rels.events], - href: env.helpers.url.path('/events').replace(/^http/, 'ws') + href: wsEventsPath } ], actions: [ diff --git a/test/test_api.js b/test/test_api.js index f29142c..006ce8e 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -219,6 +219,49 @@ describe('Zetta Api', function() { }).end(); }); + it('should have an events link formatted correctly for SPDY requests', function(done) { + var a = getHttpServer(app); + + if (!a.address()) a.listen(0); + + var agent = spdy.createAgent({ + host: '127.0.0.1', + port: a.address().port, + spdy: { + plain: true, + ssl: false + } + }); + + var request = http.get({ + host: '127.0.0.1', + port: a.address().port, + path: '/', + agent: agent + }, function(response) { + + var buffers = []; + response.on('readable', function() { + var data; + while ((data = response.read()) !== null) { + buffers.push(data); + } + }); + + response.on('end', function() { + var body = JSON.parse(Buffer.concat(buffers)); + var link = body.links.filter(function(l) { + return l.rel.indexOf('http://rels.zettajs.io/events') > -1; + })[0]; + var obj = require('url').parse(link.href, true); + assert.equal(obj.protocol, 'http:'); + agent.close(); + }); + + response.on('end', done); + }).end(); + }); + it('should have valid entities', function(done) { request(getHttpServer(app)) .get(url) From c40379f329bf47b408d86648aa1b6381fda49ea4 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Oct 2015 09:52:50 -0400 Subject: [PATCH 25/26] Updates to tag regex with beginning of input tags and end of input tags. --- lib/stream_topic.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/stream_topic.js b/lib/stream_topic.js index 38368f4..8ed49d3 100644 --- a/lib/stream_topic.js +++ b/lib/stream_topic.js @@ -39,7 +39,9 @@ StreamTopic.prototype.parse = function(topicString){ function checkForRegex(s) { if (typeof s === 'string') { if(s[0] === '{' && s[s.length - 1] === '}') { - return new RegExp(s.slice(1, -1)); + var regexString = s.slice(1, -1); + regexString = '^' + regexString + '$'; + return new RegExp(regexString); } else { return s; } From 2979cefadd1c0e7ff4b05a13f8ddac776fe880e6 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Oct 2015 10:04:15 -0400 Subject: [PATCH 26/26] Changes to SPDY events API. Remove the link becuase it won't work, --- lib/api_resources/root.js | 14 +++++++++----- test/test_api.js | 9 ++++----- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/lib/api_resources/root.js b/lib/api_resources/root.js index 0cbe503..cbafa00 100644 --- a/lib/api_resources/root.js +++ b/lib/api_resources/root.js @@ -220,7 +220,7 @@ RootResource.prototype._renderRoot = function(env, next) { var eventsPath = env.helpers.url.path('/events'); - var wsEventsPath = isSpdy ? eventsPath : eventsPath.replace(/^http/, 'ws'); + var wsEventsPath = eventsPath.replace(/^http/, 'ws'); env.response.body = { class: ['root'], links: [ @@ -233,10 +233,7 @@ RootResource.prototype._renderRoot = function(env, next) { rel: [rels.server], href: env.helpers.url.path('/servers/' + encodeURI(this.server.id) ) }, - { - rel: [rels.events], - href: wsEventsPath - } + ], actions: [ { @@ -258,6 +255,13 @@ RootResource.prototype._renderRoot = function(env, next) { ] }; + + if(!isSpdy) { + env.response.body.links.push({ + rel: [rels.events], + href: wsEventsPath + }); + } var peerQuery = Query.of('peers').ql('where direction = "acceptor" and status = "connected"'); this.server.peerRegistry.find(peerQuery, function(err, results) { if (results) { diff --git a/test/test_api.js b/test/test_api.js index 006ce8e..4e00c49 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -219,7 +219,7 @@ describe('Zetta Api', function() { }).end(); }); - it('should have an events link formatted correctly for SPDY requests', function(done) { + it('should not have an events link for SPDY requests', function(done) { var a = getHttpServer(app); if (!a.address()) a.listen(0); @@ -250,11 +250,10 @@ describe('Zetta Api', function() { response.on('end', function() { var body = JSON.parse(Buffer.concat(buffers)); - var link = body.links.filter(function(l) { + var links = body.links.filter(function(l) { return l.rel.indexOf('http://rels.zettajs.io/events') > -1; - })[0]; - var obj = require('url').parse(link.href, true); - assert.equal(obj.protocol, 'http:'); + }); + assert.equal(links.length, 0); agent.close(); });