Skip to content

Commit

Permalink
Implement scan.parallel. Closes #90
Browse files Browse the repository at this point in the history
  • Loading branch information
brandongoode committed May 31, 2017
1 parent 7529e11 commit d7f7f77
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 10 deletions.
10 changes: 10 additions & 0 deletions docs/_docs/scan.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ Recursively scan as long as lastKey exists. This function will also return a pro

`max` is the maximum number of recursive scans. Default: 0 - unlimited

### scan.parallel(totalSegments)

Preforms a parallel scan on the table.

`totalSegments` is the number of parallel scans

The results will be merged into a single array. `.lastKey` will be an array of `lastKey` objects.

**Warning** this can consume a lot of capacity.

### scan.and()

For readability only. Scans us AND logic for multiple attributes. `and()` does not provide any functionality and can be omitted.
Expand Down
86 changes: 76 additions & 10 deletions lib/Scan.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';
var Q = require('q');
var _ = require('lodash');
var debug = require('debug')('dynamoose:scan');

var errors = require('./errors');
Expand Down Expand Up @@ -86,10 +87,17 @@ Scan.prototype.exec = function (next) {
scanReq.Limit = options.limit;
}

if(options.ExclusiveStartKey) {
scanReq.ExclusiveStartKey = options.ExclusiveStartKey;
if(options.parallel) {
scanReq.TotalSegments = options.parallel;
}

if(Array.isArray(options.ExclusiveStartKey)) {
scanReq.TotalSegments = options.ExclusiveStartKey.length;
} else if(options.ExclusiveStartKey) {
options.ExclusiveStartKey = [options.ExclusiveStartKey];
}


if(options.conditionalOperator) {
scanReq.ConditionalOperator = options.conditionalOperator;
}
Expand All @@ -98,17 +106,28 @@ Scan.prototype.exec = function (next) {
scanReq.ConsistentRead = true;
}

function scan () {
function scanSegment (segment) {
var deferred = Q.defer();

var scanOneReq = _.clone(scanReq);

if(scanOneReq.TotalSegments) {
scanOneReq.Segment = segment;
}

if(options.ExclusiveStartKey) {
scanOneReq.ExclusiveStartKey = options.ExclusiveStartKey[segment];
}

debug('adding scan segement', scanOneReq);

var models = {}, totalCount = 0, scannedCount = 0, timesScanned = 0, lastKey;
if (!options.all) {
options.all = {'delay': 0, 'max': 1};
}
scanOne();
function scanOne() {
debug('scan request', scanReq);
Model.$__.base.ddb().scan(scanReq, function(err, data) {
debug('scan request', scanOneReq);
Model.$__.base.ddb().scan(scanOneReq, function(err, data) {
if(err) {
debug('Error returned by scan', err);
return deferred.reject(err);
Expand Down Expand Up @@ -143,7 +162,7 @@ Scan.prototype.exec = function (next) {
} else {
models = models.concat(data.Items.map(toModel));
}

if(options.one) {
if (!models || models.length === 0) {
return deferred.resolve();
Expand All @@ -155,10 +174,10 @@ Scan.prototype.exec = function (next) {
totalCount += data.Count;
scannedCount += data.ScannedCount;
timesScanned++;

if ((options.all.max === 0 || timesScanned < options.all.max) && lastKey) {
// scan.all need to scan again
scanReq.ExclusiveStartKey = lastKey;
scanOneReq.ExclusiveStartKey = lastKey;
setTimeout(scanOne, options.all.delay * 1000);
}
else {
Expand All @@ -175,10 +194,52 @@ Scan.prototype.exec = function (next) {
});
}

return deferred.promise.nodeify(next);
return deferred.promise;
}


function scan () {
var deferred = Q.defer();

var totalSegments = scanReq.TotalSegments || 1;
var scans = [];
for(var segment = 0; segment < totalSegments; segment++) {
scans.push(scanSegment(segment));
}
Q.all(scans)
.then(function (results) {
var models = _.flatten(results);
var lastKeys = results.map(function (r) {
return r.lastKey;
});

if(lastKeys.length === 1) {
models.lastKey = lastKeys[0];
} else if (_.compact(lastKeys).length !== 0){
models.lastKey = lastKeys;
}


models.count = results.reduce(function(acc, r) {
return acc + r.count;
}, 0);
models.scannedCount = results.reduce(function(acc, r) {
return acc + r.scannedCount;
}, 0);
models.timesScanned = results.reduce(function(acc, r) {
return acc + r.timesScanned;
}, 0);
deferred.resolve(models);

})
.fail(function (err) {
deferred.reject(err);
});

return deferred.promise.nodeify(next);

}

if(Model$.options.waitForActive) {
return Model$.table.waitForActive().then(scan);
}
Expand Down Expand Up @@ -436,4 +497,9 @@ Scan.prototype.all = function (delay, max) {
return this;
};

Scan.prototype.parallel = function (numberOfSegments) {
this.options.parallel = numberOfSegments;
return this;
};

module.exports = Scan;
45 changes: 45 additions & 0 deletions test/Scan.js
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,51 @@ describe('Scan', function (){
});
});

it('Scan parallel', function (done) {
var Dog = dynamoose.model('Dog');

Dog.scan().parallel(2).exec(function (err, dogs) {
should.not.exist(err);
dogs.length.should.eql(20);
done();
});
});


it('Scan with startAt array - implied parallel', function (done) {
var Dog = dynamoose.model('Dog');

Dog.scan().parallel(2).limit(2).exec()
.then(function (dogs) {
dogs.length.should.eql(4);
dogs.lastKey.length.should.eql(2);
dogs.count.should.eql(4);
dogs.scannedCount.should.eql(4);
dogs.timesScanned.should.eql(2);
return Dog.scan().startAt(dogs.lastKey).exec();
})
.then(function (more) {
more.length.should.eql(16);
more.count.should.eql(16);
more.scannedCount.should.eql(16);
more.timesScanned.should.eql(2);
done();
})
.catch(done);
});

it('Scan parallel all', function (done) {
var Dog = dynamoose.model('Dog');

Dog.scan().parallel(2).limit(2).all().exec()
.then(function (dogs) {
dogs.length.should.eql(20);
should.not.exist(dogs.lastKey);
done();
})
.catch(done);
});




Expand Down

0 comments on commit d7f7f77

Please sign in to comment.