Skip to content

V0.2 Performance Improvements

darach edited this page Nov 19, 2012 · 3 revisions

V0.2 Performance Improvements

Prior to v0.2 the algorithm for sliding windows, although the cost of accumulating events was amortised, had to maintain a ring buffer for the size of the window. With increasing N, an event accumulation had to be cascaded back to all open windows so that, on a sliding basis, the next event emitted would always be representative of the last N events.

For small window sizes, or relatively low frequency events this is not a major issue.

The Problem.

The algorithm was follows for submitting events into a sliding window:

  var old_enqueue = function(buffer,v) {
    var i = idx;
    // Accumulate event to current and any prior active slots
    // NOTE: Heuston, we have a problem. O(N^2)
    do {
      buf[i-- % size].accumulate(v);
    } while(i + size > idx && i >= 0);
    // If we've hit our size 
    if (idx >= mark) {
      // Find the slot
      var x = (i+1) % size;
      // Emit the result
      var r = buf[x].emit();
      self.emit('emit',r);
      // 'close' the active window. 'open' a new window in that slot
      buf[x].init();
    }
    idx+=1;
  };

The problem here is the loop. Remember that we are accumulating a set of events over a window? You can think of this as the outer loop. The inner loop once the window is full (and it should be most of the time) is of the same size. So a high frequency of events for a large N will degrade exponentially. Most CEP implementations that I'm aware of suffer from this.

It performed a like a bad dive as the gravity of window size increased:

Le Resolution?

The new algorithm is simpler and faster.

  self.enqueue = function(v) {
    if (idx >= mark) {
      fn.accumulate(v);
      self.emit('emit', fn.emit());
      var po = (idx+1) % size; // prior open
      fn.compensate(p[po]);
      p[idx % size]=v;
    } else {
      fn.accumulate(v);
      p[idx] = v;
    }
    idx+=1;
  };

Notice the absence of the loop? In this revision when a window closes the effect of the value that is to be evicted by the next event is reversed through calling a new compensate function on the aggregate function API.

So, if we were, say, counting events, we would 'uncount' an event. This is a really simple but powerful observation at a small cost of having to maintain compensating functions - as it turns out these are really cheap, compared to having to back-propagate events. The cost is on a per event basis will be constant most of the time (this really depends on the aggregate function implementation.

The cost of cleverness though was having to change the Aggregate Function API to introduce the necessary compensation when a window closes. I'm calling this 'compensating aggregate functions' for want of a better term.

An example compensating aggregate function

// Count all the things
function CountFunction() {
  var self = this, n;
  self.name = "count";
  self.init = function() { n = 0; };
  self.accumulate = function(ignored) { n += 1; };
  self.compensate = function(ignored) { n -= 1; };
  self.emit = function()  { return n; };
  self.make = function() { return new CountFunction(); };
}
util.inherits(CountFunction, AggregateFunction);

Conclusion

This is about as linear as sliding windows are going to get. They cost a little more than tumbling windows, but not a lot.

A picture sans Swans, says a 1000 worlds.

Straight enough methinks.

There may be a hidden bonus here too that will be exploited in version.next!