Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for UpdateShardCound & DescribeLimits #48

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# EditorConfig helps developers define and maintain consistent
# coding styles between different editors and IDEs
# editorconfig.org

root = true

[*]

# Change these settings to your own preference
indent_style = space
indent_size = 2

# We recommend you to keep these unchanged
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
22 changes: 3 additions & 19 deletions actions/createStream.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
var BigNumber = require('bignumber.js'),
db = require('../db')

var POW_128 = new BigNumber(2).pow(128),
SEQ_ADJUST_MS = 2000
var db = require('../db'), createShards = require('./util.createShards')

module.exports = function createStream(store, data, cb) {

Expand Down Expand Up @@ -30,20 +26,6 @@ module.exports = function createStream(store, data, cb) {
'for current limits and how to request higher limits.'))
}

var i, shards = new Array(data.ShardCount), shardHash = POW_128.div(data.ShardCount).floor(),
createTime = Date.now() - SEQ_ADJUST_MS, stream
for (i = 0; i < data.ShardCount; i++) {
shards[i] = {
HashKeyRange: {
StartingHashKey: shardHash.times(i).toFixed(),
EndingHashKey: (i < data.ShardCount - 1 ? shardHash.times(i + 1) : POW_128).minus(1).toFixed(),
},
SequenceNumberRange: {
StartingSequenceNumber: db.stringifySequence({shardCreateTime: createTime, shardIx: i}),
},
ShardId: db.shardIdName(i),
}
}
stream = {
RetentionPeriodHours: 24,
EnhancedMonitoring: [{ShardLevelMetrics: []}],
Expand All @@ -56,6 +38,8 @@ module.exports = function createStream(store, data, cb) {
_tags: Object.create(null), // Hidden data, remove when returning
}

var shards = createShards(data.ShardCount)

metaDb.put(key, stream, function(err) {
if (err) return cb(err)

Expand Down
10 changes: 10 additions & 0 deletions actions/describeLimits.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
var db = require('../db')

module.exports = function describeLimits(store, data, cb) {

db.sumShards(store, function(err, shardSum) {
if (err) return cb(err)

cb(null, {OpenShardCount: shardSum, ShardLimit: store.shardLimit})
})
}
96 changes: 96 additions & 0 deletions actions/updateShardCount.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
var BigNumber = require('bignumber.js'),
db = require('../db'), createShards = require('./util.createShards')

module.exports = function UpdateShardCount(store, data, cb) {

var metaDb = store.metaDb, key = data.StreamName, TargetShardCount = data.TargetShardCount

metaDb.lock(key, function(release) {
cb = release(cb)

store.getStream(key, function(err, stream) {
if (err) return cb(err)

if (stream.StreamStatus != 'ACTIVE') {
return cb(db.clientError('ResourceInUseException',
'Stream ' + data.StreamName + ' under account ' + metaDb.awsAccountId +
' not ACTIVE, instead in state ' + stream.StreamStatus))
}

var openShards = stream.Shards.filter(function(shard) {
return shard.SequenceNumberRange.EndingSequenceNumber == null
}).length

if (TargetShardCount > store.shardLimit) {

return cb(db.clientError('LimitExceededException',
'Target shard count or number of open shards cannot be greater than ' + store.shardLimit + '. ' +
'Current open shard count: ' + openShards + ', Target shard count: ' + TargetShardCount))
}

if (TargetShardCount > openShards * 2) {

return cb(db.clientError('LimitExceededException',
'UpdateShardCount cannot scale up over double your current open shard count. ' +
'Current open shard count: ' + openShards + ', Target shard count: ' + TargetShardCount))
}

if (TargetShardCount < openShards / 2) {
return cb(db.clientError('LimitExceededException',
'UpdateShardCount cannot scale down below half your current open shard count. ' +
'Current open shard count: ' + openShards + ', Target shard count: ' + TargetShardCount))
}

if (stream.StreamStatus != 'ACTIVE') {
return cb(db.clientError('ResourceInUseException',
'Stream ' + key + ' under account ' + metaDb.awsAccountId +
' not ACTIVE, instead in state ' + stream.StreamStatus))
}

stream.StreamStatus = 'UPDATING'

db.sumShards(store, function(err, shardSum) {
if (err) return cb(err)

metaDb.put(key, stream, function(err) {
if (err) return cb(err)

setTimeout(function() {

metaDb.lock(key, function(release) {
cb = release(function(err) {
if (err && !/Database is not open/.test(err)) console.error(err.stack || err)
})

store.getStream(key, function(err, stream) {
if (err && err.name == 'NotFoundError') return cb()
if (err) return cb(err)

stream.Shards.forEach(function(shard, index) {

var now = Date.now()

shard.SequenceNumberRange.EndingSequenceNumber = db.stringifySequence({
shardCreateTime: db.parseSequence(shard.SequenceNumberRange.StartingSequenceNumber).shardCreateTime,
shardIx: index,
seqIx: new BigNumber('7fffffffffffffff', 16).toFixed(),
seqTime: now,
})

})

stream.Shards = stream.Shards.concat(createShards(TargetShardCount, shardSum))
stream.StreamStatus = 'ACTIVE'

metaDb.put(key, stream, cb)
})
})

}, store.updateStreamMs)

cb()
})
})
})
})
}
27 changes: 27 additions & 0 deletions actions/util.createShards.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
var db = require('../db'),
BigNumber = require('bignumber.js'),
POW_128 = new BigNumber(2).pow(128),
SEQ_ADJUST_MS = 2000

module.exports = function(shardCount, startIndex) {

startIndex = startIndex || 0

var i, shards = new Array(shardCount), shardHash = POW_128.div(shardCount).floor(),
createTime = Date.now() - SEQ_ADJUST_MS

for (i = 0; i < shardCount; i++) {
shards[i] = {
HashKeyRange: {
StartingHashKey: shardHash.times(i).toFixed(),
EndingHashKey: (i < shardCount - 1 ? shardHash.times(i + 1) : POW_128).minus(1).toFixed(),
},
SequenceNumberRange: {
StartingSequenceNumber: db.stringifySequence({shardCreateTime: createTime, shardIx: i}),
},
ShardId: db.shardIdName(startIndex + i),
}
}

return shards
}
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var MAX_REQUEST_BYTES = 7 * 1024 * 1024
var validApis = ['Kinesis_20131202'],
validOperations = ['AddTagsToStream', 'CreateStream', 'DeleteStream', 'DescribeStream', 'GetRecords',
'GetShardIterator', 'ListStreams', 'ListTagsForStream', 'MergeShards', 'PutRecord', 'PutRecords',
'RemoveTagsFromStream', 'SplitShard', 'IncreaseStreamRetentionPeriod', 'DecreaseStreamRetentionPeriod'],
'RemoveTagsFromStream', 'SplitShard', 'IncreaseStreamRetentionPeriod', 'DecreaseStreamRetentionPeriod',
'UpdateShardCount', 'DescribeLimits'],
actions = {},
actionValidations = {}

Expand Down
24 changes: 24 additions & 0 deletions test/describeLimits.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var helpers = require('./helpers')

var target = 'DescribeLimits',
request = helpers.request,
opts = helpers.opts.bind(null, target)

describe('describeLimits', function() {

describe('functionality', function() {

// note there is already a stream due to helpers.before

it('should return current account limits', function(done) {
request(opts({}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)
Object.keys(res.body).sort().should.eql(['OpenShardCount', 'ShardLimit'])
res.body.OpenShardCount.should.equal(3)
res.body.ShardLimit.should.equal(helpers.shardLimit)
done()
})
})
})
})
4 changes: 3 additions & 1 deletion test/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ exports.randomName = randomName
exports.waitUntilActive = waitUntilActive
exports.waitUntilDeleted = waitUntilDeleted
exports.testStream = randomName()
exports.createTestStreams = createTestStreams
exports.deleteTestStreams = deleteTestStreams
// For testing:
// exports.testStream = '__kinesalite_test_1'

Expand All @@ -39,7 +41,7 @@ var port = 10000 + Math.round(Math.random() * 10000),
{host: 'kinesis.' + exports.awsRegion + '.amazonaws.com', method: 'POST', ssl: true} :
{host: '127.0.0.1', port: port, method: 'POST'}

var kinesaliteServer = kinesalite({path: process.env.KINESALITE_PATH})
var kinesaliteServer = kinesalite({path: process.env.KINESALITE_PATH, shardLimit: exports.shardLimit})

before(function(done) {
this.timeout(200000)
Expand Down
Loading