diff --git a/.jshintrc b/.jshintrc new file mode 100644 index 0000000..fa0311d --- /dev/null +++ b/.jshintrc @@ -0,0 +1,38 @@ +{ + "globals": { + "it": true, + "describe": true, + "beforeEach": true + }, + "curly": true, + "camelcase": false, + "evil": false, + "expr": true, + "browser": true, + "trailing": true, + "sub": true, + "eqeqeq": false, + "eqnull": true, + "devel": false, + "smarttabs": false, + "laxbreak": false, + "laxcomma": true, + "jquery": false, + "loopfunc": true, + "indent": 2, + "bitwise": true, + "noarg": true, + "noempty": true, + "nonew": true, + "undef": true, + "boss": true, + "node": true, + "newcap": true, + "quotmark": "single", + "unused": true, + "strict": true, + "maxparams": 6, + "maxdepth": 5, + "maxstatements": 20, + "maxcomplexity": 10 +} diff --git a/README.md b/README.md index 9d1545e..fed3d55 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,29 @@ # x51 Utility for aggregating and flushing items to be processed -### TODO More documentation \ No newline at end of file +## When do I use this? +Use this when you want to collect items, and flush them out to another function +when either you have enough items, or when enough time has passed. + +## How do I use it? +```js +// require('x51') returns a function to initialize a new x51 instance +var x51 = require('x51')({ + flushInterval: 60000, // Milliseconds between flushing, defaults to 60,000 + maxRecords: Infinity, // Max items to collect before flushing, defaults to Infinity + flush: function(items) { + // Receives a non-empty array of items to be flushed. + // If this function throws an error, or returns a promise that is rejected, + // then these items are automatically kept, and will be flushed again next time. + console.log('Items: ', items); + } +}); + +x51.push('item1'); +x51.push({ + foo: 'Items can be anything' +}); + +// If you feel the need, you can also manually flush at any time... +x51.flush(); +``` \ No newline at end of file diff --git a/index.js b/index.js index 18f530b..463a719 100644 --- a/index.js +++ b/index.js @@ -1,13 +1,15 @@ 'use strict'; -module.exports = function() { +module.exports = function(opts) { - var _configured = false; - var _flushInterval; - var _maxRecords; - var _flush; - - var _log = _defaultLogger(); + var _flush = opts.flush; + var _flushInterval = opts.flushInterval || 60000; + var _maxRecords = opts.maxRecords || Infinity; + var _log = opts.log || _defaultLogger(); + + if (!_flush) { + throw new Error('You must provide a `flush` funtion to init(opts)'); + } // Will flush using this setTimeout var _timeout; @@ -15,47 +17,30 @@ module.exports = function() { var _items = []; var self = { - init: init, push: push, flush: flush }; + setFlushInterval(); + return self; ///// - function init(opts) { - _configured = false; - - _flush = opts.flush; - _flushInterval = opts.flushInterval || 60000; - _maxRecords = opts.maxRecords || Infinity; - _log = opts.log || _defaultLogger(); - - if (!_flush) { - throw new Error('You must provide a `flush` funtion to init(opts)'); + function setFlushInterval() { + clearTimeout(_timeout); + if (_flushInterval < Infinity) { + _timeout = setTimeout(flush, _flushInterval); } - - _configured = true; - - _timeout = setTimeout(flush, _flushInterval); - - return self; } - function flush() { + function flush(resetFlush) { - // If we flushed because we reached our max items, then make sure we don't - // try to automatically flush again until the flushInterval has passed - clearTimeout(_timeout); - _timeout = setTimeout(flush, _flushInterval); - - if (_items.length === 0) { - return; + if (resetFlush) { + setFlushInterval(); } - if (!_configured) { - _log.warn('X51 is trying to flush records, but it has not been initialized yet'); + if (_items.length === 0) { return; } @@ -73,7 +58,7 @@ module.exports = function() { _items = _items.concat(curBatch); } - if (res && res.catch) { + if (res && res.then && res.catch) { // Was probably a promise -- try catching and handling any errors res.catch(function(err) { _log.error(err, 'Error sending items'); @@ -81,22 +66,22 @@ module.exports = function() { _items = _items.concat(curBatch); }); } - - // TODO: What if flush() is a callback function? } function push(item) { _items.push(item); if (_items.length >= _maxRecords) { - flush(); + // If we flushed because we reached our max items, then make sure we don't + // try to automatically flush again until the flushInterval has passed + flush(true); } } function _defaultLogger() { + var noop = function() {}; return { - warn: console.error.bind(console, ''), - error: console.error.bind(console, '') + error: noop }; } diff --git a/index.test.js b/index.test.js index 4528795..7379b55 100644 --- a/index.test.js +++ b/index.test.js @@ -6,19 +6,18 @@ var Promise = require('bluebird'); describe(__filename, function() { - var x51; + var mod; beforeEach(function() { delete require.cache[require.resolve('./index')]; - var mod = require('./index'); - x51 = mod(); + mod = require('./index'); }); it('Should flush according to the flush interval', function(done) { var flush = sinon.stub(); - x51.init({ + var x51 = mod({ flush: flush, flushInterval: 200 }); @@ -36,35 +35,13 @@ describe(__filename, function() { }); - it('Should not flush until it has been initialized', function() { - - x51.push({}); - - x51.flush(); - - var flush = sinon.stub(); - - x51.init({ - flush: flush - }); - - x51.push({}); - - x51.flush(); - - expect(flush.callCount).to.eql(1); - - var items = flush.getCall(0).args[0]; - expect(items.length).to.eql(2); - }); - it('Should not lose records if flush throws an error', function() { var flush = sinon.stub(); flush.onCall(0).throws(new Error()); flush.onCall(1).returns(undefined); - x51.init({ + var x51 = mod({ flush: flush }); @@ -94,7 +71,7 @@ describe(__filename, function() { flush.onCall(0).returns(Promise.reject(new Error())); flush.onCall(1).returns(Promise.resolve()); - x51.init({ + var x51 = mod({ flush: flush }); @@ -125,7 +102,7 @@ describe(__filename, function() { it('Should not flush automatically if the number of records is below the set threshold', function() { var flush = sinon.stub(); - x51.init({ + var x51 = mod({ flush: flush, maxRecords: 2 }); @@ -138,7 +115,7 @@ describe(__filename, function() { it('Should proactively flush if the number of records passes the set threshold', function() { var flush = sinon.stub(); - x51.init({ + var x51 = mod({ flush: flush, maxRecords: 2 }); diff --git a/package.json b/package.json index ff12b27..1ee9661 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "x51", - "version": "1.0.0", + "version": "2.0.0", "description": "Utility for aggregating and flushing items to be processed", "main": "index.js", "scripts": { @@ -13,7 +13,8 @@ "keywords": [ "batch", "flush", - "process" + "process", + "aggregate" ], "author": "MikeyBurkman", "license": "MIT",