-
Notifications
You must be signed in to change notification settings - Fork 77
/
metering-hourly-job.js
65 lines (53 loc) · 2.19 KB
/
metering-hourly-job.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
const AWS = require('aws-sdk');
const { AWS_REGION: aws_region } = process.env;
const dynamodb = new AWS.DynamoDB({ apiVersion: '2012-08-10', region: aws_region });
const sqs = new AWS.SQS({ apiVersion: '2012-11-05', region: aws_region });
const { SQSMeteringRecordsUrl: QueueUrl, AWSMarketplaceMeteringRecordsTableName: AWSMarketplaceMeteringRecordsTableName } = process.env;
async function asyncForEach(array, callback) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array)
}
}
const addUpDimensions = (objectArray) => Object.values(objectArray.reduce((accumulator, currentValue) => (
(accumulator[currentValue.dimension]
? (accumulator[currentValue.dimension].value += currentValue.value)
: accumulator[currentValue.dimension] = { ...currentValue }
), accumulator), {}));
exports.job = async () => {
const params = {
TableName: AWSMarketplaceMeteringRecordsTableName,
IndexName: 'PendingMeteringRecordsIndex',
KeyConditionExpression: 'metering_pending = :b',
ExpressionAttributeValues: {
':b': { S: 'true' },
},
};
const result = await dynamodb.query(params).promise();
const items = result.Items.map((i) => AWS.DynamoDB.Converter.unmarshall(i));
const hashMap = {};
items.map((item) => {
const { customerIdentifier } = item;
if (hashMap[customerIdentifier]) {
hashMap[customerIdentifier].create_timestamps.push(item.create_timestamp);
hashMap[customerIdentifier].dimension_usage = addUpDimensions([...hashMap[customerIdentifier].dimension_usage, ...item.dimension_usage]);
} else {
hashMap[customerIdentifier] = item;
hashMap[customerIdentifier].create_timestamps = [item.create_timestamp];
delete hashMap[customerIdentifier].create_timestamp;
}
});
await asyncForEach(Object.keys(hashMap), async (hash) => {
const SQSParams = {
MessageBody: JSON.stringify(hashMap[hash]),
MessageGroupId: hash,
QueueUrl,
};
try {
await sqs.sendMessage(SQSParams).promise();
console.log(`Records submitted to queue: ${JSON.stringify(hashMap[hash])}`);
} catch (error) {
console.error(error, error.stack);
}
});
return true;
};