-
Notifications
You must be signed in to change notification settings - Fork 1
/
lambda-stream-processor.js
66 lines (53 loc) · 1.48 KB
/
lambda-stream-processor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
'use strict';
const AWS = require('aws-sdk');
const dynamoDB = new AWS.DynamoDB.DocumentClient();
const tableName = process.env.TABLE_NAME;
exports.handler = function(event, context, callback) {
const requestItems = buildRequestItems(event.Records);
const requests = buildRequests(requestItems);
Promise.all(requests)
.then(() => callback(null, `Delivered ${event.Records.length} records`))
.catch(callback);
};
function buildRequestItems(records) {
return records.map((record) => {
const json = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
const item = JSON.parse(json);
return {
PutRequest: {
Item: item,
},
};
});
}
function buildRequests(requestItems) {
const requests = [];
while (requestItems.length > 0) {
const request = batchWrite(requestItems.splice(0, 25));
requests.push(request);
}
return requests;
}
function batchWrite(requestItems, attempt = 0) {
const params = {
RequestItems: {
[tableName]: requestItems,
},
};
let delay = 0;
if (attempt > 0) {
delay = 50 * Math.pow(2, attempt);
}
return new Promise(function(resolve, reject) {
setTimeout(function() {
dynamoDB.batchWrite(params).promise()
.then(function(data) {
if (data.UnprocessedItems.hasOwnProperty(tableName)) {
return batchWrite(data.UnprocessedItems[tableName], attempt + 1);
}
})
.then(resolve)
.catch(reject);
}, delay);
});
}