Skip to content

Commit

Permalink
Merge pull request #232 from cumulus-nasa/CUMULUS-359
Browse files Browse the repository at this point in the history
CUMULUS-359 Add payload to one time rule created from kinesis stream
  • Loading branch information
laurenfrederick committed Mar 8, 2018
2 parents 78a2146 + 9411a38 commit a719897
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 137 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Removed
- Unused queue lambda in api/lambdas [CUMULUS-359]

### Fixed
- Kinesis message content is passed to the triggered workflow [CUMULUS-359]
- Kinesis message queues a workflow message and does not write to rules table [CUMULUS-359]

## [v1.1.0] - 2018-03-05

### Added
Expand Down
1 change: 0 additions & 1 deletion packages/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ exports.jobs = require('./lambdas/jobs');
exports.bootstrap = require('./lambdas/bootstrap').handler;
exports.scheduler = require('./lambdas/sf-scheduler');
exports.starter = require('./lambdas/sf-starter');
exports.queue = require('./lambdas/queue');
exports.kinesisConsumer = require('./lambdas/kinesis-consumer').handler;

const indexer = require('./es/indexer');
Expand Down
81 changes: 47 additions & 34 deletions packages/api/lambdas/kinesis-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
const Ajv = require('ajv');

const Rule = require('../models/rules');
const model = new Rule();
const messageSchema = require('./kinesis-consumer-event-schema.json');
const { queueWorkflowMessage } = require('@cumulus/ingest/queue');

/**
* `getKinesisRules` scans and returns DynamoDB rules table for enabled, 'kinesis'-type rules associated with the * collection declared in the event
* `getKinesisRules` scans and returns DynamoDB rules table for enabled,
* 'kinesis'-type rules associated with the * collection declared in the event
*
* @param {object} event lambda event
* @returns {array} List of zero or more rules found from table scan
* @param {Object} event - lambda event
* @returns {Array} List of zero or more rules found from table scan
*/
async function getKinesisRules(event) {
const collection = event.collection;
const model = new Rule();
const kinesisRules = await model.scan({
names: {
'#col': 'collection',
Expand All @@ -34,63 +36,74 @@ async function getKinesisRules(event) {
}

/**
* `createOneTimeRules` creates new rules with the same data as a kinesis-type rule, except the type is modified to 'onetime'.
* Queue a workflow message for the kinesis rule with the message passed
* to kinesis as the payload
*
* @param {array} kinesisRules list of rule objects
* @returns {array} Array of promises for model.create
* @param {Object} kinesisRule - kinesis rule to queue the message for
* @param {Object} eventObject - message passed to kinesis
* @returns {Promise} promise resolved when the message is queued
*/
async function createOneTimeRules(kinesisRules) {
const oneTimeRulePromises = kinesisRules.map((kinesisRule) => {
const oneTimeRuleParams = Object.assign({}, kinesisRule);
delete oneTimeRuleParams['createdAt'];
delete oneTimeRuleParams['updatedAt'];
oneTimeRuleParams.name = `${kinesisRule.name}_${Date.now().toString()}`;
oneTimeRuleParams.rule.type = 'onetime';
return model.create(oneTimeRuleParams);
});
async function queueMessageForRule(kinesisRule, eventObject) {
const item = {
workflow: kinesisRule.workflow,
provider: kinesisRule.provider,
collection: kinesisRule.collection,
payload: eventObject
};

return await Promise.all(oneTimeRulePromises);
return Rule.buildPayload(item)
.then(queueWorkflowMessage);
}

/**
* `validateMessage` validates an event as being valid for creating a workflow. See the messageSchema defined at
* the top of this file.
* `validateMessage` validates an event as being valid for creating a workflow.
* See the messageSchema defined at the top of this file.
*
* @param {object} event lambda event
* @returns {(error|object)} Throws an Ajv.ValidationError if event object is invalid. Returns the event object if event is valid.
* @param {Object} event - lambda event
* @returns {(error|Object)} Throws an Ajv.ValidationError if event object is invalid.
* Returns the event object if event is valid.
*/
async function validateMessage(event) {
const ajv = new Ajv({allErrors: true});
const ajv = new Ajv({ allErrors: true });
const validate = ajv.compile(messageSchema);
return await validate(event);
}

async function processRecord(record) {
/**
* Process data sent to a kinesis stream. Validate the data and
* queue a workflow message for each rule.
*
* @param {*} record - input to the kinesis stream
* @returns {[Promises]} Array of promises. Each promise is resolved when a
* message is queued for all associated kinesis rules.
*/
function processRecord(record) {
const dataBlob = record.kinesis.data;
const dataString = Buffer.from(dataBlob, 'base64').toString();
const eventObject = JSON.parse(dataString);

await validateMessage(eventObject)
return validateMessage(eventObject)
.then(getKinesisRules)
.then((kinesisRules) => {
return createOneTimeRules(kinesisRules);
});
.then((kinesisRules) => (
Promise.all(kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject)))
));
}

/**
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection in the event argument. It
* creates new onetime rules for each rule found to trigger the workflow defined in the 'kinesis'-type rule.
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection
* in the event argument. It enqueues a message for each kinesis-type rule to trigger
* the associated workflow.
*
* @param {*} event lambda event
* @param {*} context lambda context
* @param {*} cb callback function to explicitly return information back to the caller.
* @param {*} event - lambda event
* @param {*} context - lambda context
* @param {*} cb - callback function to explicitly return information back to the caller.
* @returns {(error|string)} Success message or error
*/
function handler(event, context, cb) {
const records = event.Records;

return Promise.all(records.map(r => processRecord(r)))
.then((results) => cb(null, results.filter(r => r !== undefined)))
return Promise.all(records.map(processRecord))
.then((results) => cb(null, results.filter((r) => r !== undefined)))
.catch((err) => {
cb(JSON.stringify(err));
});
Expand Down
34 changes: 0 additions & 34 deletions packages/api/lambdas/queue.js

This file was deleted.

5 changes: 5 additions & 0 deletions packages/api/tests/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rules": {
"no-param-reassign": "off"
}
}
Loading

0 comments on commit a719897

Please sign in to comment.