Skip to content

Commit

Permalink
2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeyBurkman committed Jul 18, 2016
1 parent 87707ea commit d0aa774
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 73 deletions.
38 changes: 38 additions & 0 deletions .jshintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"globals": {
"it": true,
"describe": true,
"beforeEach": true
},
"curly": true,
"camelcase": false,
"evil": false,
"expr": true,
"browser": true,
"trailing": true,
"sub": true,
"eqeqeq": false,
"eqnull": true,
"devel": false,
"smarttabs": false,
"laxbreak": false,
"laxcomma": true,
"jquery": false,
"loopfunc": true,
"indent": 2,
"bitwise": true,
"noarg": true,
"noempty": true,
"nonew": true,
"undef": true,
"boss": true,
"node": true,
"newcap": true,
"quotmark": "single",
"unused": true,
"strict": true,
"maxparams": 6,
"maxdepth": 5,
"maxstatements": 20,
"maxcomplexity": 10
}
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,29 @@
# x51
Utility for aggregating and flushing items to be processed

### TODO More documentation
## When do I use this?
Use this when you want to collect items, and flush them out to another function
when either you have enough items, or when enough time has passed.

## How do I use it?
```js
// require('x51') returns a function to initialize a new x51 instance
var x51 = require('x51')({
flushInterval: 60000, // Milliseconds between flushing, defaults to 60,000
maxRecords: Infinity, // Max items to collect before flushing, defaults to Infinity
flush: function(items) {
// Receives a non-empty array of items to be flushed.
// If this function throws an error, or returns a promise that is rejected,
// then these items are automatically kept, and will be flushed again next time.
console.log('Items: ', items);
}
});

x51.push('item1');
x51.push({
foo: 'Items can be anything'
});

// If you feel the need, you can also manually flush at any time...
x51.flush();
```
65 changes: 25 additions & 40 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,61 +1,46 @@
'use strict';

module.exports = function() {
module.exports = function(opts) {

var _configured = false;
var _flushInterval;
var _maxRecords;
var _flush;

var _log = _defaultLogger();
var _flush = opts.flush;
var _flushInterval = opts.flushInterval || 60000;
var _maxRecords = opts.maxRecords || Infinity;
var _log = opts.log || _defaultLogger();

if (!_flush) {
throw new Error('You must provide a `flush` funtion to init(opts)');
}

// Will flush using this setTimeout
var _timeout;
// Contains everything that hasn't been flushed yet
var _items = [];

var self = {
init: init,
push: push,
flush: flush
};

setFlushInterval();

return self;

/////

function init(opts) {
_configured = false;

_flush = opts.flush;
_flushInterval = opts.flushInterval || 60000;
_maxRecords = opts.maxRecords || Infinity;
_log = opts.log || _defaultLogger();

if (!_flush) {
throw new Error('You must provide a `flush` funtion to init(opts)');
function setFlushInterval() {
clearTimeout(_timeout);
if (_flushInterval < Infinity) {
_timeout = setTimeout(flush, _flushInterval);
}

_configured = true;

_timeout = setTimeout(flush, _flushInterval);

return self;
}

function flush() {
function flush(resetFlush) {

// If we flushed because we reached our max items, then make sure we don't
// try to automatically flush again until the flushInterval has passed
clearTimeout(_timeout);
_timeout = setTimeout(flush, _flushInterval);

if (_items.length === 0) {
return;
if (resetFlush) {
setFlushInterval();
}

if (!_configured) {
_log.warn('X51 is trying to flush records, but it has not been initialized yet');
if (_items.length === 0) {
return;
}

Expand All @@ -73,30 +58,30 @@ module.exports = function() {
_items = _items.concat(curBatch);
}

if (res && res.catch) {
if (res && res.then && res.catch) {
// Was probably a promise -- try catching and handling any errors
res.catch(function(err) {
_log.error(err, 'Error sending items');
// In case the error is transient, make sure we don't lose any logs
_items = _items.concat(curBatch);
});
}

// TODO: What if flush() is a callback function?
}

function push(item) {
_items.push(item);

if (_items.length >= _maxRecords) {
flush();
// If we flushed because we reached our max items, then make sure we don't
// try to automatically flush again until the flushInterval has passed
flush(true);
}
}

function _defaultLogger() {
var noop = function() {};
return {
warn: console.error.bind(console, '<WARN>'),
error: console.error.bind(console, '<ERROR>')
error: noop
};
}

Expand Down
37 changes: 7 additions & 30 deletions index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ var Promise = require('bluebird');

describe(__filename, function() {

var x51;
var mod;

beforeEach(function() {
delete require.cache[require.resolve('./index')];
var mod = require('./index');
x51 = mod();
mod = require('./index');
});


it('Should flush according to the flush interval', function(done) {
var flush = sinon.stub();

x51.init({
var x51 = mod({
flush: flush,
flushInterval: 200
});
Expand All @@ -36,35 +35,13 @@ describe(__filename, function() {

});

it('Should not flush until it has been initialized', function() {

x51.push({});

x51.flush();

var flush = sinon.stub();

x51.init({
flush: flush
});

x51.push({});

x51.flush();

expect(flush.callCount).to.eql(1);

var items = flush.getCall(0).args[0];
expect(items.length).to.eql(2);
});

it('Should not lose records if flush throws an error', function() {
var flush = sinon.stub();

flush.onCall(0).throws(new Error());
flush.onCall(1).returns(undefined);

x51.init({
var x51 = mod({
flush: flush
});

Expand Down Expand Up @@ -94,7 +71,7 @@ describe(__filename, function() {
flush.onCall(0).returns(Promise.reject(new Error()));
flush.onCall(1).returns(Promise.resolve());

x51.init({
var x51 = mod({
flush: flush
});

Expand Down Expand Up @@ -125,7 +102,7 @@ describe(__filename, function() {
it('Should not flush automatically if the number of records is below the set threshold', function() {
var flush = sinon.stub();

x51.init({
var x51 = mod({
flush: flush,
maxRecords: 2
});
Expand All @@ -138,7 +115,7 @@ describe(__filename, function() {
it('Should proactively flush if the number of records passes the set threshold', function() {
var flush = sinon.stub();

x51.init({
var x51 = mod({
flush: flush,
maxRecords: 2
});
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "x51",
"version": "1.0.0",
"version": "2.0.0",
"description": "Utility for aggregating and flushing items to be processed",
"main": "index.js",
"scripts": {
Expand All @@ -13,7 +13,8 @@
"keywords": [
"batch",
"flush",
"process"
"process",
"aggregate"
],
"author": "MikeyBurkman",
"license": "MIT",
Expand Down

0 comments on commit d0aa774

Please sign in to comment.