diff --git a/lib/api_resources/root.js b/lib/api_resources/root.js index 6f0ddd0..cbafa00 100644 --- a/lib/api_resources/root.js +++ b/lib/api_resources/root.js @@ -214,6 +214,13 @@ RootResource.prototype._queryDevices = function(env, next) { }; //This is called when ql and server isn't supplied RootResource.prototype._renderRoot = function(env, next) { + + var isForwardedProtocol = env.request.headers.hasOwnProperty('x-forwarded-proto') && ['http', 'https'].indexOf(env.request.headers['x-forwarded-proto']) !== -1; + var isSpdy = !!env.request.isSpdy && !isForwardedProtocol; + + + var eventsPath = env.helpers.url.path('/events'); + var wsEventsPath = eventsPath.replace(/^http/, 'ws'); env.response.body = { class: ['root'], links: [ @@ -225,7 +232,8 @@ RootResource.prototype._renderRoot = function(env, next) { title: this.server._name, rel: [rels.server], href: env.helpers.url.path('/servers/' + encodeURI(this.server.id) ) - } + }, + ], actions: [ { @@ -247,6 +255,13 @@ RootResource.prototype._renderRoot = function(env, next) { ] }; + + if(!isSpdy) { + env.response.body.links.push({ + rel: [rels.events], + href: wsEventsPath + }); + } var peerQuery = Query.of('peers').ql('where direction = "acceptor" and status = "connected"'); this.server.peerRegistry.find(peerQuery, function(err, results) { if (results) { diff --git a/lib/event_broker.js b/lib/event_broker.js index ca72ba6..ad7d42a 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -1,8 +1,6 @@ +var JSCompiler = require('caql-js-compiler'); var querytopic = require('./query_topic'); - -function topicMatch(a, b) { - return a === b; -} +var StreamTopic = require('./stream_topic'); var EventBroker = module.exports = function(zetta) { this.peers = {}; @@ -14,6 +12,8 @@ var EventBroker = module.exports = function(zetta) { this._publishListeners = {}; // {: {: _listner } } this._deviceQueries = {}; + + this._queryCache = {}; }; EventBroker.prototype.peer = function(peer) { @@ -38,7 +38,10 @@ EventBroker.prototype._setupPublishListener = function(peer, topic) { } this._publishListeners[peer.name][topic] = function(data) { - self._publish(topic, data); + streamTopic = StreamTopic.parse(topic); + + + self._publish(streamTopic, null, data, peer.name); }; peer.on(topic, this._publishListeners[peer.name][topic]); @@ -70,12 +73,68 @@ EventBroker.prototype.client = function(client) { return stillValid && client.ws === cl.ws; }); - if (c.length > 0) { - return; - } - this.clients.push(client); + if (client.streamEnabled) { + function generateQuery(topic) { + return { + name: topic.serverName, + topic: topic.pubsubIdentifier() + }; + } + client.on('subscribe', function(subscription) { + var query = generateQuery(subscription.topic); + query.subscriptionId = subscription.subscriptionId; + query.limit = subscription.limit; + query.count = 0; + query.caql = subscription.topic.streamQuery; + client.query.push(query); + var connectedPeers = []; + + var subscribeToPeer = function(peerName) { + var copiedQuery = {}; + Object.keys(query).forEach(function(key) { + copiedQuery[key] = query[key]; + }); + + if(peerName) { + copiedQuery.name = peerName; + } + + if(connectedPeers.indexOf(copiedQuery.name) === -1) { + connectedPeers.push(copiedQuery.name); + + if(query.name instanceof RegExp && !query.name.exec(copiedQuery.name)) { + return; + } + self._subscribe(copiedQuery); + } + + } + + if(query.name instanceof RegExp || query.name === '*') { + self.zetta.pubsub.subscribe('_peer/connect', function(topic, data) { + subscribeToPeer(data.peer.name); + }); + Object.keys(self.peers).forEach(subscribeToPeer); + subscribeToPeer(self.zetta._name); + } else { + subscribeToPeer(); + } + + //listen peer connect events + //iterate through current peers + //array of peers that have been given the topic + + }); + + client.on('unsubscribe', function(subscription) { + var query = generateQuery(subscription.topic); + self._unsubscribe(query); + }); + } + + this.clients.push(client); client.on('close', function() { client.query.forEach(self._unsubscribe.bind(self)); var idx = self.clients.indexOf(client); @@ -92,7 +151,7 @@ EventBroker.prototype._subscribe = function(query) { var topic = query.topic; // is local - if (query.name === this.zetta.id) { + if (query.name === this.zetta.id || !query.name) { if (!this.subscriptions[topic]) { this.subscriptions[topic] = { count: 0, listener: null }; } @@ -110,7 +169,6 @@ EventBroker.prototype._subscribe = function(query) { this.subscriptions[topic].count++; } else { - if (!this._peerSubscriptions[query.name]) { this._peerSubscriptions[query.name] = {}; } @@ -129,11 +187,12 @@ EventBroker.prototype._subscribe = function(query) { EventBroker.prototype._unsubscribe = function(query) { var topic = query.topic; - if (query.name === this.zetta.id) { + if (query.name === this.zetta.id || !query.name) { this.subscriptions[topic].count--; if (this.subscriptions[topic].count > 0) { return; } + // unsubscribe locally this.zetta.pubsub.unsubscribe(topic, this.subscriptions[topic].listener); delete this.subscriptions[topic]; @@ -172,25 +231,58 @@ EventBroker.prototype._unsubscribe = function(query) { } }; +EventBroker.prototype._publish = function(topic, sourceTopic, msg, peerName) { + var self = this; + var originalTopic = msg.topic; -EventBroker.prototype._publish = function(topic, data) { this.clients.forEach(function(client) { client.query.forEach(function(query) { - if (!topicMatch(topic, query.topic)) { - return; + if (topic.match(query.topic)) { + + if (client.streamEnabled) { + msg.type = 'event'; + if(topic._useComponents) { + msg.topic = peerName + '/' + originalTopic; + } else { + if(topic._original.indexOf('*') === -1) { + msg.topic = topic._original; + } else { + msg.topic = sourceTopic; + } + } + msg.subscriptionId = query.subscriptionId; + + query.count++; + if (typeof query.limit === 'number' && query.count > query.limit) { + // unsubscribe broker + self._unsubscribe(query); + client._unsubscribe(query.subscriptionId) + return; + } + if (query.caql) { + + var compiled = client._queryCache[query.caql]; + var result = compiled.filterOne({ data: msg.data }); + if (result) { + msg.data = result[Object.keys(result)[0]]; + } else { + return; + } + } + } + + client.send(topic, msg, function(err){ + if (err) { + console.error('ws error: '+err); + } + }); } - - client.send(topic, data, function(err){ - if (err) { - console.error('ws error: '+err); - } - }); }); }); }; -EventBroker.prototype._onLocalPubsub = function(topic, data) { - this._publish(topic, data); +EventBroker.prototype._onLocalPubsub = function(topic, data, sourceTopic) { + this._publish(topic, sourceTopic, data, this.zetta._name); }; diff --git a/lib/event_socket.js b/lib/event_socket.js index 73c0e7a..fdbcfad 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -1,21 +1,139 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var ObjectStream = require('zetta-streams').ObjectStream; +var EventStreamsParser = require('./event_streams_parser'); +var StreamTopic = require('./stream_topic'); var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions; var deviceFormatter = require('./api_formats/siren/device.siren'); +var JSCompiler = require('caql-js-compiler'); -var EventSocket = module.exports = function(ws, query) { +//Flag to indicate that we expect data back on teh websocket +//Tracking subscriptions +var EventSocket = module.exports = function(ws, query, streamEnabled) { EventEmitter.call(this); this.ws = ws; + this.query = []; + this._queryCache = {}; - if (!Array.isArray(query)) { - query = [query]; + // list of event streams + this._subscriptions = []; + this._subscriptionIndex = 0; + this.streamEnabled = !!(streamEnabled); + + // only setup parser when using event stream + if (streamEnabled) { + var self = this; + this._parser = new EventStreamsParser(); + this._parser.on('error', function(err, original) { + var msg = { + type: 'error', + code: 400, + timestamp: new Date().getTime(), + topic: (typeof original === 'object') ? original.topic : null, + message: err.message + }; + self.ws.send(JSON.stringify(msg)); + }); + + this._parser.on('subscribe', function(msg) { + var topic = new StreamTopic(); + try { + topic.parse(msg.topic); + } catch(err) { + var msg = { + type: 'error', + code: 400, + timestamp: new Date().getTime(), + topic: msg.topic, + message: err.message + }; + self.ws.send(JSON.stringify(msg)); + return; + } + + if(topic.streamQuery && !self._queryCache[topic.streamQuery]) { + try { + var compiler = new JSCompiler(); + var compiled = compiler.compile(topic.streamQuery); + self._queryCache[topic.streamQuery] = compiled; + } catch(err) { + var msg = { + type: 'error', + code: 400, + timestamp: new Date().getTime(), + topic: msg.topic, + message: err.message + } + self.ws.send(JSON.stringify(msg)); + return; + } + } + + var subscription = { subscriptionId: ++self._subscriptionIndex, topic: topic, limit: msg.limit }; + self._subscriptions.push(subscription); + + var msg = { + type: 'subscribe-ack', + timestamp: new Date().getTime(), + topic: msg.topic, + subscriptionId: subscription.subscriptionId + }; + self.ws.send(JSON.stringify(msg)); + self.emit('subscribe', subscription); + }); + + this._parser.on('unsubscribe', function(msg) { + self._unsubscribe(msg.subscriptionId, function(err, subscription) { + if (subscription) { + self.emit('unsubscribe', subscription); + } + }); + }); + } else { + if (!Array.isArray(query)) { + query = [query]; + } + this.query = query; // contains .topic, .name } - this.query = query; // contains .topic, .name + this.init(); }; util.inherits(EventSocket, EventEmitter); +EventSocket.prototype._unsubscribe = function(subscriptionId, cb) { + var self = this; + var foundIdx = -1; + self._subscriptions.some(function(subscription, idx) { + if(subscription.subscriptionId === subscriptionId) { + foundIdx = idx; + return true; + } + }); + + if (foundIdx < 0) { + var msg = { + type: 'error', + code: 405, + timestamp: new Date().getTime(), + message: (new Error('Unable to unsubscribe from invalid subscriptionId')).message + }; + self.ws.send(JSON.stringify(msg)); + return; + } + + var subscription = self._subscriptions.splice(foundIdx, 1)[0]; + var msg = { + type: 'unsubscribe-ack', + timestamp: new Date().getTime(), + subscriptionId: subscription.subscriptionId + }; + + self.ws.send(JSON.stringify(msg)); + if (typeof cb === 'function') { + cb(null, subscription); + } +}; + EventSocket.prototype.send = function(topic, data) { if (!Buffer.isBuffer(data) && typeof data === 'object') { if (data['transitions']) { @@ -31,6 +149,16 @@ EventSocket.prototype.send = function(topic, data) { data = ObjectStream.format(topic, data.peer.properties()); } + if(Object.keys(data).length === 4 && typeof data.peer === 'object' && topic instanceof StreamTopic) { + data = { + data: data.peer.properties(), + subscriptionId: data.subscriptionId, + topic: data.topic, + timestamp: data.timestamp, + type: data.type + }; + } + try { data = JSON.stringify(data); } catch (err) { @@ -50,19 +178,17 @@ EventSocket.prototype.send = function(topic, data) { this.ws.send.apply(this.ws, args); }; -EventSocket.prototype.onData = function(data) { - var args = ['data'].concat(Array.prototype.slice.call(arguments)); - // @todo handle remote devices publishing data on the websocket - this.emit.apply(this, args); -}; - EventSocket.prototype.onClose = function() { this.emit('close'); }; EventSocket.prototype.init = function() { var self = this; - this.ws.on('message', this.onData.bind(this)); + this.ws.on('message', function(buffer) { + if (self.streamEnabled) { + self._parser.add(buffer); + } + }); this.ws.on('close', this.onClose.bind(this)); this.ws.on('error',function(err){ console.error('ws error:', err); diff --git a/lib/event_streams_parser.js b/lib/event_streams_parser.js new file mode 100644 index 0000000..a2618d9 --- /dev/null +++ b/lib/event_streams_parser.js @@ -0,0 +1,53 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +var EventStreamParser = module.exports = function() { + EventEmitter.call(this); +}; +util.inherits(EventStreamParser, EventEmitter); + +EventStreamParser.prototype.add = function(buf) { + var json = null; + var self = this; + try { + if(Buffer.isBuffer(buf)) { + json = JSON.parse(buf.toString()); + } else { + json = JSON.parse(buf); + } + } catch(e) { + self.emit('error', e, buf); + return; + } + + if (this.validate(json)) { + this.emit(json.type, json); + } else { + this.emit('error', new Error('Message validation failed.'), json); + } +}; + +EventStreamParser.prototype.validate = function(json) { + var properties = { + 'subscribe': { topic: 'string' }, + 'unsubscribe': { subscriptionId: 'number' }, + 'error': { code: 'number', timestamp: 'number', topic: 'string' }, + 'subscribe-ack': { timestamp: 'number', subscriptionId: 'number', topic: 'string' }, + 'unsubscribe-ack': { timestamp: 'number', subscriptionId: 'number' }, + 'event': { topic: 'string', timestamp: 'number', subscriptionId: 'number'} + } + + var keys = properties[json.type]; + var valid = true; + if(keys) { + Object.keys(keys).forEach(function(key) { + if(typeof json[key] !== keys[key]) { + valid = false; + } + }); + } else { + return false; + } + + return valid; +}; diff --git a/lib/http_server.js b/lib/http_server.js index 647ca48..659e183 100644 --- a/lib/http_server.js +++ b/lib/http_server.js @@ -225,8 +225,15 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) { if (/^\/events/.exec(ws.upgradeReq.url)) { self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name); + var queryString = url.parse(ws.upgradeReq.url).query; - var query = querystring.parse(url.parse(ws.upgradeReq.url).query); + if(!queryString) { + var client = new EventSocket(ws, null, true); + self.eventBroker.client(client); + return; + } + + var query = querystring.parse(queryString); function copy(q) { var c = {}; @@ -246,7 +253,7 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) { if (qt) { q.topic = querytopic.format(qt); } - var client = new EventSocket(ws, q); + var client = new EventSocket(ws, q, false); self.eventBroker.client(client); } }); diff --git a/lib/pubsub_service.js b/lib/pubsub_service.js index f1c0063..4f28ce3 100644 --- a/lib/pubsub_service.js +++ b/lib/pubsub_service.js @@ -1,4 +1,5 @@ var EventEmitter = require('events').EventEmitter; +var StreamTopic = require('./stream_topic'); var deviceFormatter = require('./api_formats/siren/device.siren'); var PubSub = module.exports = function() { @@ -9,33 +10,45 @@ var PubSub = module.exports = function() { PubSub.prototype.publish = function(topic, data) { var x = decodeURIComponent(topic); this.emitter.emit(x, data); + this.emitter.emit('_data', x, data); }; PubSub.prototype.subscribe = function(topic, callback) { - var f = null; - if (typeof callback === 'function') { - f = this._onCallback(topic, callback); - this.emitter.on(topic, f); - } else if (typeof callback === 'object') { - f = this._onResponse(topic, callback); - this.emitter.on(topic, f); - } else { - return; + var self = this; + if (typeof topic === 'string') { + topic = StreamTopic.parse(topic); } + + var f = function(t, data) { + if (topic.match(t)) { + if (typeof callback === 'function') { + self._onCallback(topic, t, data, callback); + } else if (typeof callback === 'object') { + self._onResponse(topic, data, callback); + } + } + }; + + this.emitter.on('_data', f); - if (!this._listeners[topic]) { - this._listeners[topic] = []; + if (!this._listeners[topic.hash()]) { + this._listeners[topic.hash()] = []; } - - this._listeners[topic].push({ listener: callback, actual: f }); + + this._listeners[topic.hash()].push({ listener: callback, actual: f }); }; PubSub.prototype.unsubscribe = function(topic, listener) { - if (!this._listeners[topic]) { + if (typeof topic === 'string') { + topic = StreamTopic.parse(topic); + } + + if (!this._listeners[topic.hash()]) { return; } + var found = -1; - this._listeners[topic].some(function(l, idx) { + this._listeners[topic.hash()].some(function(l, idx) { if (l.listener === listener) { found = idx; return true; @@ -50,59 +63,53 @@ PubSub.prototype.unsubscribe = function(topic, listener) { listener.response.end(); // end response for push request } - this.emitter.removeListener(topic, this._listeners[topic][found].actual); - this._listeners[topic].splice(found, 1); + this.emitter.removeListener('_data', this._listeners[topic.hash()][found].actual); + this._listeners[topic.hash()].splice(found, 1); - if (this._listeners[topic].length === 0) { - delete this._listeners[topic]; + if (this._listeners[topic.hash()].length === 0) { + delete this._listeners[topic.hash()]; } - }; -PubSub.prototype._onCallback = function(topic, cb) { +PubSub.prototype._onCallback = function(topic, sourceTopic, data, cb) { var self = this; - return function(data, options) { - cb(topic, data); - }; + cb(topic, data, sourceTopic); }; -PubSub.prototype._onResponse = function(topic, env) { +PubSub.prototype._onResponse = function(topic, data, env) { var self = this; - return function(data) { - var encoding = ''; - if(Buffer.isBuffer(data)) { - encoding = 'application/octet-stream'; - } else if (data.query && data.device) { - var serverId = env.route.params.serverId; - var loader = { path: '/servers/' + encodeURIComponent(serverId) }; - data = deviceFormatter({ loader: loader, env: env, model: data.device }); + var encoding = ''; + if(Buffer.isBuffer(data)) { + encoding = 'application/octet-stream'; + } else if (data.query && data.device) { + var serverId = env.route.params.serverId; + var loader = { path: '/servers/' + encodeURIComponent(serverId) }; + data = deviceFormatter({ loader: loader, env: env, model: data.device }); + data = new Buffer(JSON.stringify(data)); + } else if (typeof data == 'object') { + encoding = 'application/json'; + try { data = new Buffer(JSON.stringify(data)); - } else if (typeof data == 'object') { - encoding = 'application/json'; - try { - data = new Buffer(JSON.stringify(data)); - } catch (err) { - console.error(err, err.stack); - return; - } + } catch (err) { + console.error(err, err.stack); + return; + } + } else { + console.error('PubSub._onResponse encoding not set.'); + } + var stream = env.response.push('/' + topic.hash(), { 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io', + 'Content-Length': data.length, + 'Content-Type': encoding + }); + + stream.on('error', function(err) { + if (err.code === 'RST_STREAM' && err.status === 3) { + stream.end(); } else { - console.error('PubSub._onResponse encoding not set.'); + console.error('PubSub._onCallback', err); } + }); - var stream = env.response.push('/' + topic, { 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io', - 'Content-Length': data.length, - 'Content-Type': encoding - }); - - stream.on('error', function(err) { - if (err.code === 'RST_STREAM' && err.status === 3) { - stream.end(); - } else { - console.error('PubSub._onCallback', err); - } - }); - - stream.end(data); - }; + stream.end(data); }; diff --git a/lib/stream_topic.js b/lib/stream_topic.js new file mode 100644 index 0000000..8ed49d3 --- /dev/null +++ b/lib/stream_topic.js @@ -0,0 +1,160 @@ +var minimatch = require('minimatch'); + +var StreamTopic = module.exports = function() { + this.serverName = null; + this.deviceType = null; + this.deviceId = null; + this.streamName = null; + this.streamQuery = null; + this._original = null; + this._useComponents = false; +} + +StreamTopic.prototype.parse = function(topicString){ + this._original = topicString; + + var previousCharacter = null; + var currentCharacter = null; + var start = 0; + var topicComponents = []; + for(var i = 0; i < topicString.length; i++) { + currentCharacter = topicString[i]; + if(currentCharacter === '/' && previousCharacter !== '\\') { + topicComponents.push(topicString.slice(start, i)); + start = i + 1; + } else if(i === topicString.length - 1) { + topicComponents.push(topicString.slice(start, topicString.length)); + } + previousCharacter = currentCharacter; + } + + if (topicComponents.length < 3 && topicComponents.indexOf('**') === -1) { + return; + } + + if (topicComponents.length === 3) { + topicComponents.unshift(null); + } + + function checkForRegex(s) { + if (typeof s === 'string') { + if(s[0] === '{' && s[s.length - 1] === '}') { + var regexString = s.slice(1, -1); + regexString = '^' + regexString + '$'; + return new RegExp(regexString); + } else { + return s; + } + } + } + + // led/123/state + // _peer/connect + // _peer/disconnect + // query:asdasd + // query/asd + this._useComponents = true; + this.serverName = checkForRegex(topicComponents[0]); + this.deviceType = checkForRegex(topicComponents[1]); + this.deviceId = checkForRegex(topicComponents[2]); + if(topicComponents[3]) { + var streamComponents = topicComponents[3].split('?'); + this.streamName = checkForRegex(streamComponents[0]); + if (streamComponents[1]) { + this.streamQuery = streamComponents[1]; + } + } else { + this.streamName = undefined; + } +} + +StreamTopic.prototype.hash = function() { + return this._original; +}; + +StreamTopic.prototype.pubsubIdentifier = function() { + + function sanitizeRegex(part) { + if (part instanceof RegExp) { + return '{' + part.source + '}'; + } else { + return part; + } + } + + if (this._useComponents) { + return sanitizeRegex(this.deviceType) + '/' + sanitizeRegex(this.deviceId) + '/' + sanitizeRegex(this.streamName); + } else { + return this._original; + } +}; + +StreamTopic.prototype.match = function(topicString) { + if (this._useComponents) { + var components = [ this.serverName, this.deviceType, this.deviceId, this.streamName ].filter(function(i) { return i !== undefined });; + var checkedComponents = []; + var topicStringComponents = []; + var checkTopic = StreamTopic.parse(topicString); + var checkComponents = [ checkTopic.serverName, checkTopic.deviceType, checkTopic.deviceId, checkTopic.streamName ].filter(function(i) { return i !== undefined }); + var matchStart = null; + + //{^Det.+$}/led/** + //[RegExp, String] + //['Detroit-123', 'led/123/state'] + //RegExp -> 'Detroit-123' && String -> 'led/123/state' + + + //{^Det.+$}/led/*/{^sta.+$} + //[RegExp, String, RegExp] + //['Detroit-123', 'led/123', 'state'] + //RegExp -> 'Detroit-123' && String -> 'led/123/state' && RegExp -> 'state' + + //{^Det.+$}/**/{^stream.+$} + //[RegExp, String, RegExp] + //['Detroit-123', 'led/123', 'stream-123'] + + components.forEach(function(component, idx) { + if (component instanceof RegExp) { + if(matchStart !== null) { + var checkedComponent = components.slice(matchStart, idx).join('/'); + checkedComponents.push(checkedComponent); + if(checkedComponent === '**' && components.length < 4) { + topicStringComponents.push(checkComponents.slice(matchStart, idx + 1).join('/')); + topicStringComponents.push(checkComponents[idx + 1]); + return; + } else { + topicStringComponents.push(checkComponents.slice(matchStart, idx).join('/')); + } + matchStart = null; + } + checkedComponents.push(component); + topicStringComponents.push(checkComponents[idx]); + } else if(component !== undefined) { + if(matchStart === null) { + matchStart = idx; + } + if(idx === components.length - 1) { + checkedComponents.push(components.slice(matchStart).join('/')); + topicStringComponents.push(checkComponents.slice(matchStart).join('/')); + } + } + }); + + return checkedComponents.every(function(component, idx) { + var topicComponent = topicStringComponents[idx]; + if(component instanceof RegExp) { + return component.exec(topicComponent); + } else { + return minimatch(topicComponent, component); + } + }); + } else { + return minimatch(topicString, this._original); + } +}; + +StreamTopic.parse = function(topicString) { + var topic = new StreamTopic(); + topic.parse(topicString); + return topic; +} diff --git a/package.json b/package.json index f4c6a1f..fae2663 100644 --- a/package.json +++ b/package.json @@ -11,10 +11,12 @@ "calypso": "^1.0.0", "calypso-level": "^0.5.0", "calypso-query-decompiler": "^0.4.0", + "caql-js-compiler": "^0.5.0", "colors": "~0.6.2", "levelup": "^0.18.5", "medea": "^1.0.0", "medeadown": "^1.1.8", + "minimatch": "^3.0.0", "node-uuid": "^1.4.1", "revolt": "^0.7.0", "rx": "~2.2.20", @@ -25,7 +27,7 @@ "zetta-auto-scout": "^0.9.0", "zetta-device": "^0.17.0", "zetta-http-device": "^0.4.0", - "zetta-rels": "^0.4.0", + "zetta-rels": "^0.5.0", "zetta-scientist": "^0.5.0", "zetta-scout": "^0.6.0", "zetta-streams": "^0.3.0" diff --git a/test/fixture/example_driver.js b/test/fixture/example_driver.js index 8c9ed18..87e4bf4 100644 --- a/test/fixture/example_driver.js +++ b/test/fixture/example_driver.js @@ -27,6 +27,7 @@ TestDriver.prototype.init = function(config) { .monitor('foo') .stream('bar', this.streamBar) .stream('foobar', this.streamFooBar, {binary: true}) + .stream('fooobject', this.streamObject) .map('test-number', function(x, cb) { cb(); }, [{ name: 'value', type: 'number'}]) .map('test-text', function(x, cb) { cb(); }, [{ name: 'value', type: 'text'}]) .map('test-none', function(x, cb) { cb(); }, [{ name: 'value'}]) @@ -48,6 +49,10 @@ TestDriver.prototype.prepare = function(cb) { cb(); }; +TestDriver.prototype.streamObject = function(stream) { + this._streamObject = stream; +}; + TestDriver.prototype.returnError = function(error, cb) { cb(new Error(error)); }; @@ -59,6 +64,12 @@ TestDriver.prototype.incrementStreamValue = function() { } } +TestDriver.prototype.publishStreamObject = function(obj) { + if(this._streamObject) { + this._streamObject.write(obj); + } +}; + TestDriver.prototype.streamBar = function(stream) { this._stream = stream; } diff --git a/test/test_api.js b/test/test_api.js index 31e01d3..4e00c49 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -219,6 +219,48 @@ describe('Zetta Api', function() { }).end(); }); + it('should not have an events link for SPDY requests', function(done) { + var a = getHttpServer(app); + + if (!a.address()) a.listen(0); + + var agent = spdy.createAgent({ + host: '127.0.0.1', + port: a.address().port, + spdy: { + plain: true, + ssl: false + } + }); + + var request = http.get({ + host: '127.0.0.1', + port: a.address().port, + path: '/', + agent: agent + }, function(response) { + + var buffers = []; + response.on('readable', function() { + var data; + while ((data = response.read()) !== null) { + buffers.push(data); + } + }); + + response.on('end', function() { + var body = JSON.parse(Buffer.concat(buffers)); + var links = body.links.filter(function(l) { + return l.rel.indexOf('http://rels.zettajs.io/events') > -1; + }); + assert.equal(links.length, 0); + agent.close(); + }); + + response.on('end', done); + }).end(); + }); + it('should have valid entities', function(done) { request(getHttpServer(app)) .get(url) @@ -338,7 +380,7 @@ describe('Zetta Api', function() { request(getHttpServer(app)) .get('/') .expect(getBody(function(res, body) { - assert.equal(body.links.length, 3); + assert.equal(body.links.length, 4); hasLinkRel(body.links, 'self'); })) .end(done) @@ -353,6 +395,15 @@ describe('Zetta Api', function() { .end(done) }); + it('should contain link for event stream', function(done) { + request(getHttpServer(app)) + .get('/') + .expect(getBody(function(res, body) { + hasLinkRel(body.links, rels.events); + })) + .end(done) + }); + it('should use a default server name if none has been provided', function(done) { var app = zetta({ registry: reg, peerRegistry: peerRegistry }).silent()._run(); diff --git a/test/test_event_broker.js b/test/test_event_broker.js index d06d4d1..372da20 100644 --- a/test/test_event_broker.js +++ b/test/test_event_broker.js @@ -112,9 +112,6 @@ describe('EventBroker', function() { clientA.emit('close'); - done(); - return; - setTimeout(function() { assert.equal(recievedA, 1); assert.equal(recievedB, 2); diff --git a/test/test_event_socket.js b/test/test_event_socket.js index 967f264..266f46d 100644 --- a/test/test_event_socket.js +++ b/test/test_event_socket.js @@ -67,6 +67,24 @@ describe('EventSocket', function() { },1); }); + it('should init parser if passed streaming flag', function() { + var ws = new Ws(); + var client = new EventSocket(ws, 'some-topic', true); + assert(client._parser) + }) + + it('should emit subscribe event when subscribe message is parsed', function(done) { + var ws = new Ws(); + var client = new EventSocket(ws, 'some-topic', true); + client.on('subscribe', function(subscription) { + assert(subscription.subscriptionId); + assert(subscription.topic); + assert.equal(subscription.limit, 10); + done(); + }) + var msg = { type: 'subscribe', topic: 'Detroit/led/1234/state', limit: 10}; + ws.emit('message', new Buffer(JSON.stringify(msg))); + }) }); diff --git a/test/test_event_stream_parser.js b/test/test_event_stream_parser.js new file mode 100644 index 0000000..a0ea964 --- /dev/null +++ b/test/test_event_stream_parser.js @@ -0,0 +1,104 @@ +var EventStreamParser = require('../lib/event_streams_parser'); +var assert = require('assert'); + +describe('Event Stream Parser', function() { + it('validates subscribe messages correctly', function() { + var message = { type: 'subscribe', topic: 'Detroit/led/1234/state' }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('invalidates subscribe messages correctly', function() { + var message = { type: 'subscribe'}; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('validates unsubscribe messages correctly', function() { + var message = { type: 'unsubscribe', subscriptionId: 1 }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('invalidates unsubscribe messages correctly', function() { + var message = { type: 'unsubscribe' }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('validates unsubscribe-ack messages correctly', function() { + var message = { type: 'unsubscribe-ack', subscriptionId: 1, timestamp: 1 }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('invalidates unsubscribe-ack messages correctly no subscriptionId', function() { + var message = { type: 'unsubscribe-ack', timestamp: 1 }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('invalidates unsubscribe-ack messages correctly no timestamp no subscriptionId', function() { + var message = { type: 'unsubscribe-ack' }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('invalidates unsubscribe-ack messages correctly no timestamp', function() { + var message = { type: 'unsubscribe-ack', timestamp: 1 }; + var parser = new EventStreamParser(); + assert(!parser.validate(message)); + }); + + it('validates subscribe-ack messages correctly', function() { + var message = { type: 'unsubscribe-ack', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1}; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('validates error messages correctly', function() { + var message = { type: 'error', code: 1, timestamp: 1, topic: 'Detroit/led/1234/state' }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('validates event messages correctly', function() { + var message = { type: 'event', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1 }; + var parser = new EventStreamParser(); + assert(parser.validate(message)); + }); + + it('should emit event for message type when parsing buffer', function(done) { + var parser = new EventStreamParser(); + var message = { type: 'event', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1 }; + parser.on('event', function(msg) { + assert.equal(msg.type, message.type); + assert.equal(msg.timestamp, message.timestamp); + assert.equal(msg.topic, message.topic); + assert.equal(msg.subscriptionId, message.subscriptionId); + done(); + }); + + parser.add(new Buffer(JSON.stringify(message))); + }) + + it('should emit error for invalid message type when parsing buffer', function(done) { + var parser = new EventStreamParser(); + var message = { type: 'not-a-message', timestamp: 1, topic: 'Detroit/led/1234/state', subscriptionId: 1 }; + parser.on('error', function(msg) { + done(); + }); + + parser.add(new Buffer(JSON.stringify(message))); + }) + + it('should emit error for invalid JSON when parsing buffer', function(done) { + var parser = new EventStreamParser(); + parser.on('error', function(msg) { + done(); + }); + + parser.add(new Buffer('some text')); + }) + +}); diff --git a/test/test_event_streams.js b/test/test_event_streams.js new file mode 100644 index 0000000..bd192fc --- /dev/null +++ b/test/test_event_streams.js @@ -0,0 +1,831 @@ +var assert = require('assert'); +var WebSocket = require('ws'); +var zetta = require('./..'); +var zettacluster = require('zetta-cluster'); +var Driver = require('./fixture/example_driver'); +var MemRegistry = require('./fixture/mem_registry'); +var MemPeerRegistry = require('./fixture/mem_peer_registry'); + +describe('Peering Event Streams', function() { + var cloud = null; + var cloudUrl = null; + var baseUrl = '/events'; + + beforeEach(function(done) { + cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + cloud.silent(); + cloud.listen(0, function(err) { + if(err) { + return done(err); + } + cloudUrl = 'http://localhost:' + cloud.httpServer.server.address().port; + done(); + }); + }); + + afterEach(function(done) { + cloud.httpServer.server.close(); + done(); + }); + + it('will receive a _peer/connect event when subscribed', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/connect' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/connect'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + assert.equal(json.topic, '_peer/connect'); + done(); + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); + + it('will receive a _peer/disconnect event when subscribed', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + var peer = data.peer; + peer.close(); + }); + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/disconnect' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/disconnect'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + assert.equal(json.topic, '_peer/disconnect'); + done(); + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); + + it('will receive a _peer/connect event when subscribed with wildcards', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + var peer = data.peer; + }); + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/*' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/*'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + assert.equal(json.topic, '_peer/connect'); + done(); + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); + it('will receive a _peer/connect and _peer/disconnect event when subscribed with wildcards', function(done) { + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.silent(); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + var peer = data.peer; + peer.close(); + }); + var recv = 0; + z.listen(0, function(err) { + if(err) { + return done(err); + } + var zPort = z.httpServer.server.address().port; + var endpoint = 'localhost:' + zPort; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: '_peer/*' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, '_peer/*'); + assert(json.subscriptionId); + } else if(json.type === 'event') { + recv++; + if(recv == 1) { + assert.equal(json.topic, '_peer/connect'); + } else if(recv == 2) { + assert.equal(json.topic, '_peer/disconnect'); + done(); + } + + } + }); + }); + ws.on('error', done); + z.link(cloudUrl); + }); + }); +}); + +describe('Event Streams', function() { + var cluster = null; + var urls = []; + var baseUrl = '/events'; + var devices = []; + var validTopics = []; + + beforeEach(function(done) { + urls = []; + devices = []; + validTopics = []; + cluster = zettacluster({ zetta: zetta }) + .server('cloud') + .server('hub', [Driver, Driver], ['cloud']) + .server('hub2', [Driver, Driver], ['cloud']) + .on('ready', function() { + app = cluster.servers['cloud']; + urls.push('localhost:' + cluster.servers['cloud']._testPort); + urls.push('localhost:' + cluster.servers['hub']._testPort); + + ['hub', 'hub2'].forEach(function(hubname) { + Object.keys(cluster.servers[hubname].runtime._jsDevices).forEach(function(id) { + var device = cluster.servers[hubname].runtime._jsDevices[id]; + devices.push(device); + validTopics.push(hubname + '/' + device.type + '/' + device.id + '/state'); + }); + }) + done(); + }) + .run(function(err){ + if (err) { + return done(err); + } + }); + }); + + afterEach(function() { + cluster.stop(); + }); + + describe('Websocket API', function() { + var itBoth = function(testMsg, test) { + it('for cloud, ' + testMsg, test.bind(null, 0)); + it('for hub, ' + testMsg, test.bind(null, 1)); + }; + + itBoth('subscribing to a topic receives a subscription-ack', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: 'hub/led/1234/state' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, 'hub/led/1234/state'); + assert(json.subscriptionId); + done(); + }); + }); + ws.on('error', done); + }); + + itBoth('unsubscribing to a topic receives a unsubscription-ack', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: 'hub/led/1234/state' }; + ws.send(JSON.stringify(msg)); + ws.once('message', function(buffer) { + var json = JSON.parse(buffer); + var msg = { type: 'unsubscribe', subscriptionId: json.subscriptionId }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json2 = JSON.parse(buffer); + assert.equal(json2.type, 'unsubscribe-ack'); + assert(json2.timestamp); + assert.equal(json2.subscriptionId, json.subscriptionId); + done(); + }); + }); + }); + ws.on('error', done); + }); + + itBoth('verify error message format', function(){}); + + itBoth('specific topic subscription only receives messages with that topic', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = validTopics[0]; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + }, 50); + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + done(); + } + }); + }); + ws.on('error', done); + }); + + itBoth('multiple clients specific topic subscription only receives messages with that topic', function(idx, done) { + var endpoint = urls[idx]; + var topic = validTopics[0]; + + var connected = 0; + var recv = 0; + + var ws1 = new WebSocket('ws://' + endpoint + baseUrl); + ws1.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws1.send(JSON.stringify(msg)); + var subscriptionId = null; + ws1.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + connected++; + subscriptionId = json.subscriptionId; + if (connected === 2) { + setTimeout(function() { + devices[0].call('change'); + }, 50); + } + } else { + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + recv++; + if (recv === 2) { + done(); + } + } + }); + }); + ws1.on('error', done); + + var ws2 = new WebSocket('ws://' + endpoint + baseUrl); + ws2.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws2.send(JSON.stringify(msg)); + var subscriptionId = null; + ws2.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + subscriptionId = json.subscriptionId; + connected++; + if (connected === 2) { + setTimeout(function() { + devices[0].call('change'); + }, 50); + } + } else { + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + recv++; + if (recv === 2) { + done(); + } + } + }); + }); + ws2.on('error', done); + }); + + itBoth('wildcard server topic subscription only receives messages with that topic', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = validTopics[0]; + topic = topic.replace('hub', '*'); + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + }, 50); + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert.equal(json.topic, validTopics[0]); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + done(); + } + }); + }); + ws.on('error', done); + }); + + + it('wildcard server topic subscription receives messages from both hubs', function(done) { + var endpoint = urls[0]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = '*/testdriver/*/state'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + var recv = 0; + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + devices[2].call('change'); + }, 50); + } else { + recv++; + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + if (recv === 2) { + done(); + } + } + }); + }); + ws.on('error', done); + }); + + + itBoth('wildcard topic for single peer receives all messages for all topics', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/*/state'; + var lastTopic = null; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].call('change'); + devices[1].call('change'); + }, 50); + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert.notEqual(json.topic, lastTopic); + lastTopic = json.topic; + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + count++; + if(count === 2) { + done(); + } + } + }); + }); + ws.on('error', done); + }); + + itBoth('topic that doesnt exist still opens stream', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var topic = 'blah/foo/1/blah'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + done(); + }); + }); + ws.on('error', done); + }); + + itBoth('wildcard and specific topic will each publish a message on a subscription', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var ackCount = 0; + var topicOne = validTopics[0]; + var topicTwo = 'hub/testdriver/*/state'; + ws.on('open', function() { + var msgOne = { type: 'subscribe', topic: topicOne }; + var msgTwo = { type: 'subscribe', topic: topicTwo }; + ws.send(JSON.stringify(msgOne)); + ws.send(JSON.stringify(msgTwo)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + ackCount++; + setTimeout(function() { + for(var i=0; i<11; i++) { + devices[0].call((i % 2 === 0) ? 'change' : 'prepare'); + } + }, 50); + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + count++; + if(count === 2) { + assert.equal(ackCount, 2); + done(); + } + } + }); + }); + ws.on('error', done); + }); + + itBoth('adding limit to subscription should limit number of messages received', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = validTopics[0]; + var data = null; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic, limit: 10 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + for(var i=0; i<11; i++) { + devices[0].call((i % 2 === 0) ? 'change' : 'prepare'); + } + }, 50); + } else if (json.type !== 'unsubscribe-ack') { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data); + + count++; + if(count === 10) { + setTimeout(function() { + assert.equal(count, 10); + done(); + }, 200) + } + } + }); + }); + ws.on('error', done); + }); + + itBoth('when limit is reached a unsubscribe-ack should be received', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = validTopics[0]; + var data = null; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic, limit: 10 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + setTimeout(function() { + for(var i=0; i<11; i++) { + devices[0].call((i % 2 === 0) ? 'change' : 'prepare'); + } + }, 50); + } else if(json.type === 'event') { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data); + count++; + } else if(json.type === 'unsubscribe-ack') { + assert.equal(json.type, 'unsubscribe-ack'); + assert(json.timestamp); + assert.equal(json.subscriptionId, subscriptionId); + assert.equal(count, 10); + done(); + } + }); + }); + ws.on('error', done); + }); + + itBoth('query field selector should only return properties in selection', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/bar?select data where data >= 1'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + setTimeout(function() { + devices[0].incrementStreamValue(); + }, 50); + } else if(json.type === 'event') { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data); + done(); + } + }); + }); + ws.on('error', done); + }); + + itBoth('query field selector * should all properties in selection', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select * where data.val >= 2'; + var data = { foo: 'bar', val: 2 }; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + setTimeout(function() { + devices[0].publishStreamObject(data); + }, 50); + } else if(json.type === 'event') { + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data); + assert.equal(json.data.val, 2); + assert.equal(json.data.foo, 'bar'); + done(); + } + }); + }); + ws.on('error', done); + }); + + itBoth('query field selector should return only selected properties', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select data.val'; + var data = { foo: 'bar', val: 2 }; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + setTimeout(function() { + devices[0].publishStreamObject(data); + }, 50); + } else if(json.type === 'event') { + assert(json.timestamp); + assert(json.topic); + assert(json.subscriptionId, subscriptionId); + assert(json.data); + assert.equal(json.data.val, 2); + assert.equal(json.data.foo, undefined); + done(); + } + }); + }); + ws.on('error', done); + }); + + describe('Protocol Errors', function() { + + itBoth('invalid stream query should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?invalid stream query'; + var data = { foo: 'bar', val: 2 }; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('invalid subscribe should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + var topic = 'hub/testdriver/' + devices[0].id + '/fooobject'; + ws.on('open', function() { + var msg = { type: 'subscribe' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('unsubscribing from an invalid subscriptionId should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + ws.on('open', function() { + var msg = { type: 'unsubscribe', subscriptionId: 123 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 405); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('unsubscribing from a missing subscriptionId should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + ws.on('open', function() { + var msg = { type: 'unsubscribe' }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + itBoth('on invalid message should result in a 400 error', function(idx, done){ + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var count = 0; + ws.on('open', function() { + var msg = { test: 123 }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + done(); + assert(json.timestamp); + assert.equal(json.code, 400); + assert(json.message); + }); + }); + ws.on('error', done); + }); + + + }) + + }); + + describe('SPDY API', function() { + }); + +}); diff --git a/test/test_stream_topic.js b/test/test_stream_topic.js new file mode 100644 index 0000000..3b4fa34 --- /dev/null +++ b/test/test_stream_topic.js @@ -0,0 +1,70 @@ +var StreamTopic = require('../lib/stream_topic'); +var assert = require('assert'); +var Query = require('calypso').Query; + +describe('Stream Topic', function() { + it('will correctly parse a topic of all valid strings', function() { + var t = new StreamTopic(); + t.parse('Detroit/led/1234/state'); + assert.equal(t.serverName, 'Detroit'); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + }); + + it('will correctly parse a regex out of a topic string', function() { + var t = new StreamTopic(); + t.parse('{^Det.+$}/led/1234/state'); + assert(t.serverName.test); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + }); + + it('will correctly parse a query out of a topic string', function() { + var t = new StreamTopic(); + t.parse('Detroit/led/1234/state?select * where data > 80'); + assert.equal(t.serverName, 'Detroit'); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + assert.equal(t.streamQuery, 'select * where data > 80'); + }); + + it('will correctly parse topics without the leading server name', function() { + var t = new StreamTopic(); + t.parse('led/1234/state'); + assert.equal(t.serverName, null); + assert.equal(t.deviceType, 'led'); + assert.equal(t.deviceId, '1234'); + assert.equal(t.streamName, 'state'); + }) + + it('hash() will return the original input', function() { + var t = new StreamTopic(); + var topic = '{^Det.+$}/led/1234/state?select * where data > 80'; + t.parse(topic); + assert.equal(t.hash(), topic); + }) + + describe('.match()', function() { + + function matchTest(query, topic, eval) { + it('should return ' + eval + ' for query ' + query + ' on topic ' + topic, function() { + var t = StreamTopic.parse(query); + assert.equal(t.match(topic), eval); + }) + } + + matchTest('led/123/*', 'led/123/state', true); + matchTest('led/321/*', 'led/123/state', false); + matchTest('led/**', 'led/123/state', true); + matchTest('{^Det.+$}/led/123/state', 'Detroit-123/led/123/state', true); + matchTest('{^Det.+$}/led/123/state', 'hub/led/123/state', false); + matchTest('{^Det.+$}/led/**', 'Detroit-123/led/123/stream', true); + matchTest('{^Det.+$}/led/123/{^stream.+$}', 'Detroit-123/led/123/stream-123', true); + matchTest('{^Det.+$}/**/{^stream.+$}', 'Detroit-123/led/123/stream-123', true); + matchTest('{^Det.+$}/**', 'Detroit-123/led/123/stream', true); + }); + +}); diff --git a/test/test_zetta.js b/test/test_zetta.js index 7aa54a7..d32ed94 100644 --- a/test/test_zetta.js +++ b/test/test_zetta.js @@ -42,6 +42,7 @@ describe('Zetta', function() { var d = require('domain').create(); d.on('error', function(err) { assert.equal(err.message, '123'); + d.dispose() done(); }); d.run(function() {