Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket streams #243

Merged
merged 31 commits into from
Oct 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c7ea8af
Merge remote-tracking branch 'origin/protocol_update' into zetta-1.0-…
AdamMagaluk Sep 30, 2015
1678acb
Started tests for event streams
AdamMagaluk Sep 30, 2015
d0bfbb6
Adding tests for websocket refactor.
mdobson Oct 1, 2015
0365724
Added stream topic class for websocket stream protocol, and tests for…
mdobson Oct 1, 2015
0daba39
Added stream topic stuff.
mdobson Oct 1, 2015
f10dd38
Added a stream parser for the new websocket protocol.
mdobson Oct 1, 2015
97d42ff
EventSocket parses incoming messages when flag is set.
AdamMagaluk Oct 1, 2015
5b414b4
Wiring up protocol parser to websockets.
mdobson Oct 2, 2015
5333475
StreamTopic accepts any topic string, PubSub converts topic string to…
AdamMagaluk Oct 2, 2015
4ad5d87
Steam topics use limit parameter
AdamMagaluk Oct 6, 2015
70ba640
Added pubsubIdentifier to StreamTopic
AdamMagaluk Oct 6, 2015
7d31a8f
Started to implement wildcard matching
AdamMagaluk Oct 6, 2015
0b428f6
Merge branch 'master' of github.com:zettajs/zetta into websocket-streams
mdobson Oct 6, 2015
929b52e
Merge branch 'websocket-streams' of github.com:zettajs/zetta into web…
mdobson Oct 6, 2015
a395885
Topic matching on stream messages.
mdobson Oct 7, 2015
4c1c9ed
Progress on wild card server topics.
mdobson Oct 7, 2015
7d50e16
event broker keeps the original topic, fixed domain test
AdamMagaluk Oct 7, 2015
52dfb64
Added event stream test for multiple hubs
AdamMagaluk Oct 7, 2015
f04af80
Updated streamtopic to create caql query from stream query. Eventbrok…
AdamMagaluk Oct 12, 2015
9186e2b
Select statements will only work on the value of the data property in…
mdobson Oct 12, 2015
9e29323
Testing select statements and where queries.
mdobson Oct 12, 2015
d1c281f
Removed calypso, using caql-js-compiler for stream queries
AdamMagaluk Oct 12, 2015
a40fe2a
Added caql-js-compiler to package.json
AdamMagaluk Oct 12, 2015
a22db19
Cache compiled caql queries for your health.
mdobson Oct 12, 2015
59f1242
Added tests for client errors
AdamMagaluk Oct 15, 2015
3c1c240
Merge branch 'master' of github.com:zettajs/zetta into websocket-streams
mdobson Oct 22, 2015
915be0d
Updates to implementation to allow for _peer/connect and _peer/discon…
mdobson Oct 22, 2015
37791d4
Merge branch 'websocket-streams' of github.com:zettajs/zetta into web…
mdobson Oct 22, 2015
45c7fa1
Updated to format the events link properly for SPDY connections. Adde…
mdobson Oct 23, 2015
c40379f
Updates to tag regex with beginning of input tags and end of input tags.
mdobson Oct 23, 2015
2979cef
Changes to SPDY events API. Remove the link becuase it won't work,
mdobson Oct 23, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/api_resources/root.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ RootResource.prototype._queryDevices = function(env, next) {
};
//This is called when ql and server isn't supplied
RootResource.prototype._renderRoot = function(env, next) {

var isForwardedProtocol = env.request.headers.hasOwnProperty('x-forwarded-proto') && ['http', 'https'].indexOf(env.request.headers['x-forwarded-proto']) !== -1;
var isSpdy = !!env.request.isSpdy && !isForwardedProtocol;


var eventsPath = env.helpers.url.path('/events');
var wsEventsPath = eventsPath.replace(/^http/, 'ws');
env.response.body = {
class: ['root'],
links: [
Expand All @@ -225,7 +232,8 @@ RootResource.prototype._renderRoot = function(env, next) {
title: this.server._name,
rel: [rels.server],
href: env.helpers.url.path('/servers/' + encodeURI(this.server.id) )
}
},

],
actions: [
{
Expand All @@ -247,6 +255,13 @@ RootResource.prototype._renderRoot = function(env, next) {
]
};


if(!isSpdy) {
env.response.body.links.push({
rel: [rels.events],
href: wsEventsPath
});
}
var peerQuery = Query.of('peers').ql('where direction = "acceptor" and status = "connected"');
this.server.peerRegistry.find(peerQuery, function(err, results) {
if (results) {
Expand Down
138 changes: 115 additions & 23 deletions lib/event_broker.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
var JSCompiler = require('caql-js-compiler');
var querytopic = require('./query_topic');

function topicMatch(a, b) {
return a === b;
}
var StreamTopic = require('./stream_topic');

var EventBroker = module.exports = function(zetta) {
this.peers = {};
Expand All @@ -14,6 +12,8 @@ var EventBroker = module.exports = function(zetta) {

this._publishListeners = {}; // {<server_name>: {<topic>: _listner } }
this._deviceQueries = {};

this._queryCache = {};
};

EventBroker.prototype.peer = function(peer) {
Expand All @@ -38,7 +38,10 @@ EventBroker.prototype._setupPublishListener = function(peer, topic) {
}

this._publishListeners[peer.name][topic] = function(data) {
self._publish(topic, data);
streamTopic = StreamTopic.parse(topic);


self._publish(streamTopic, null, data, peer.name);
};

peer.on(topic, this._publishListeners[peer.name][topic]);
Expand Down Expand Up @@ -70,12 +73,68 @@ EventBroker.prototype.client = function(client) {
return stillValid && client.ws === cl.ws;
});

if (c.length > 0) {
return;
}

this.clients.push(client);
if (client.streamEnabled) {
function generateQuery(topic) {
return {
name: topic.serverName,
topic: topic.pubsubIdentifier()
};
}
client.on('subscribe', function(subscription) {
var query = generateQuery(subscription.topic);
query.subscriptionId = subscription.subscriptionId;
query.limit = subscription.limit;
query.count = 0;
query.caql = subscription.topic.streamQuery;
client.query.push(query);
var connectedPeers = [];

var subscribeToPeer = function(peerName) {
var copiedQuery = {};
Object.keys(query).forEach(function(key) {
copiedQuery[key] = query[key];
});

if(peerName) {
copiedQuery.name = peerName;
}

if(connectedPeers.indexOf(copiedQuery.name) === -1) {
connectedPeers.push(copiedQuery.name);

if(query.name instanceof RegExp && !query.name.exec(copiedQuery.name)) {
return;
}
self._subscribe(copiedQuery);
}

}

if(query.name instanceof RegExp || query.name === '*') {
self.zetta.pubsub.subscribe('_peer/connect', function(topic, data) {
subscribeToPeer(data.peer.name);
});

Object.keys(self.peers).forEach(subscribeToPeer);
subscribeToPeer(self.zetta._name);
} else {
subscribeToPeer();
}

//listen peer connect events
//iterate through current peers
//array of peers that have been given the topic

});

client.on('unsubscribe', function(subscription) {
var query = generateQuery(subscription.topic);
self._unsubscribe(query);
});
}

this.clients.push(client);
client.on('close', function() {
client.query.forEach(self._unsubscribe.bind(self));
var idx = self.clients.indexOf(client);
Expand All @@ -92,7 +151,7 @@ EventBroker.prototype._subscribe = function(query) {
var topic = query.topic;

// is local
if (query.name === this.zetta.id) {
if (query.name === this.zetta.id || !query.name) {
if (!this.subscriptions[topic]) {
this.subscriptions[topic] = { count: 0, listener: null };
}
Expand All @@ -110,7 +169,6 @@ EventBroker.prototype._subscribe = function(query) {

this.subscriptions[topic].count++;
} else {

if (!this._peerSubscriptions[query.name]) {
this._peerSubscriptions[query.name] = {};
}
Expand All @@ -129,11 +187,12 @@ EventBroker.prototype._subscribe = function(query) {
EventBroker.prototype._unsubscribe = function(query) {
var topic = query.topic;

if (query.name === this.zetta.id) {
if (query.name === this.zetta.id || !query.name) {
this.subscriptions[topic].count--;
if (this.subscriptions[topic].count > 0) {
return;
}

// unsubscribe locally
this.zetta.pubsub.unsubscribe(topic, this.subscriptions[topic].listener);
delete this.subscriptions[topic];
Expand Down Expand Up @@ -172,25 +231,58 @@ EventBroker.prototype._unsubscribe = function(query) {
}
};

EventBroker.prototype._publish = function(topic, sourceTopic, msg, peerName) {
var self = this;
var originalTopic = msg.topic;

EventBroker.prototype._publish = function(topic, data) {
this.clients.forEach(function(client) {
client.query.forEach(function(query) {
if (!topicMatch(topic, query.topic)) {
return;
if (topic.match(query.topic)) {

if (client.streamEnabled) {
msg.type = 'event';
if(topic._useComponents) {
msg.topic = peerName + '/' + originalTopic;
} else {
if(topic._original.indexOf('*') === -1) {
msg.topic = topic._original;
} else {
msg.topic = sourceTopic;
}
}
msg.subscriptionId = query.subscriptionId;

query.count++;
if (typeof query.limit === 'number' && query.count > query.limit) {
// unsubscribe broker
self._unsubscribe(query);
client._unsubscribe(query.subscriptionId)
return;
}
if (query.caql) {

var compiled = client._queryCache[query.caql];
var result = compiled.filterOne({ data: msg.data });
if (result) {
msg.data = result[Object.keys(result)[0]];
} else {
return;
}
}
}

client.send(topic, msg, function(err){
if (err) {
console.error('ws error: '+err);
}
});
}

client.send(topic, data, function(err){
if (err) {
console.error('ws error: '+err);
}
});
});
});
};

EventBroker.prototype._onLocalPubsub = function(topic, data) {
this._publish(topic, data);
EventBroker.prototype._onLocalPubsub = function(topic, data, sourceTopic) {
this._publish(topic, sourceTopic, data, this.zetta._name);
};


Expand Down
Loading