Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CUMULUS-359 Add payload to one time rule created from kinesis stream #232

Merged
merged 13 commits into from
Mar 8, 2018
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Removed
- Unused queue lambda in api/lambdas [CUMULUS-359]

### Fixed
- Kinesis message content is passed to the triggered workflow [CUMULUS-359]
- Kinesis message queues a workflow message and does not write to rules table [CUMULUS-359]

## [v1.1.0] - 2018-03-05

### Added
Expand Down
1 change: 0 additions & 1 deletion packages/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ exports.jobs = require('./lambdas/jobs');
exports.bootstrap = require('./lambdas/bootstrap').handler;
exports.scheduler = require('./lambdas/sf-scheduler');
exports.starter = require('./lambdas/sf-starter');
exports.queue = require('./lambdas/queue');
exports.kinesisConsumer = require('./lambdas/kinesis-consumer').handler;

const indexer = require('./es/indexer');
Expand Down
81 changes: 47 additions & 34 deletions packages/api/lambdas/kinesis-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
const Ajv = require('ajv');

const Rule = require('../models/rules');
const model = new Rule();
const messageSchema = require('./kinesis-consumer-event-schema.json');
const { queueWorkflowMessage } = require('@cumulus/ingest/queue');

/**
* `getKinesisRules` scans and returns DynamoDB rules table for enabled, 'kinesis'-type rules associated with the * collection declared in the event
* `getKinesisRules` scans and returns DynamoDB rules table for enabled,
* 'kinesis'-type rules associated with the * collection declared in the event
*
* @param {object} event lambda event
* @returns {array} List of zero or more rules found from table scan
* @param {Object} event - lambda event
* @returns {Array} List of zero or more rules found from table scan
*/
async function getKinesisRules(event) {
const collection = event.collection;
const model = new Rule();
const kinesisRules = await model.scan({
names: {
'#col': 'collection',
Expand All @@ -34,63 +36,74 @@ async function getKinesisRules(event) {
}

/**
* `createOneTimeRules` creates new rules with the same data as a kinesis-type rule, except the type is modified to 'onetime'.
* Queue a workflow message for the kinesis rule with the message passed
* to kinesis as the payload
*
* @param {array} kinesisRules list of rule objects
* @returns {array} Array of promises for model.create
* @param {Object} kinesisRule - kinesis rule to queue the message for
* @param {Object} eventObject - message passed to kinesis
* @returns {Promise} promise resolved when the message is queued
*/
async function createOneTimeRules(kinesisRules) {
const oneTimeRulePromises = kinesisRules.map((kinesisRule) => {
const oneTimeRuleParams = Object.assign({}, kinesisRule);
delete oneTimeRuleParams['createdAt'];
delete oneTimeRuleParams['updatedAt'];
oneTimeRuleParams.name = `${kinesisRule.name}_${Date.now().toString()}`;
oneTimeRuleParams.rule.type = 'onetime';
return model.create(oneTimeRuleParams);
});
async function queueMessageForRule(kinesisRule, eventObject) {
const item = {
workflow: kinesisRule.workflow,
provider: kinesisRule.provider,
collection: kinesisRule.collection,
payload: eventObject
};

return await Promise.all(oneTimeRulePromises);
return Rule.buildPayload(item)
.then(queueWorkflowMessage);
}

/**
* `validateMessage` validates an event as being valid for creating a workflow. See the messageSchema defined at
* the top of this file.
* `validateMessage` validates an event as being valid for creating a workflow.
* See the messageSchema defined at the top of this file.
*
* @param {object} event lambda event
* @returns {(error|object)} Throws an Ajv.ValidationError if event object is invalid. Returns the event object if event is valid.
* @param {Object} event - lambda event
* @returns {(error|Object)} Throws an Ajv.ValidationError if event object is invalid.
* Returns the event object if event is valid.
*/
async function validateMessage(event) {
const ajv = new Ajv({allErrors: true});
const ajv = new Ajv({ allErrors: true });
const validate = ajv.compile(messageSchema);
return await validate(event);
}

async function processRecord(record) {
/**
* Process data sent to a kinesis stream. Validate the data and
* queue a workflow message for each rule.
*
* @param {*} record - input to the kinesis stream
* @returns {[Promises]} Array of promises. Each promise is resolved when a
* message is queued for all associated kinesis rules.
*/
function processRecord(record) {
const dataBlob = record.kinesis.data;
const dataString = Buffer.from(dataBlob, 'base64').toString();
const eventObject = JSON.parse(dataString);

await validateMessage(eventObject)
return validateMessage(eventObject)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we no longer need await I don't think this function needs an async declaration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, thanks!

.then(getKinesisRules)
.then((kinesisRules) => {
return createOneTimeRules(kinesisRules);
});
.then((kinesisRules) => (
Promise.all(kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject)))
));
}

/**
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection in the event argument. It
* creates new onetime rules for each rule found to trigger the workflow defined in the 'kinesis'-type rule.
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection
* in the event argument. It enqueues a message for each kinesis-type rule to trigger
* the associated workflow.
*
* @param {*} event lambda event
* @param {*} context lambda context
* @param {*} cb callback function to explicitly return information back to the caller.
* @param {*} event - lambda event
* @param {*} context - lambda context
* @param {*} cb - callback function to explicitly return information back to the caller.
* @returns {(error|string)} Success message or error
*/
function handler(event, context, cb) {
const records = event.Records;

return Promise.all(records.map(r => processRecord(r)))
.then((results) => cb(null, results.filter(r => r !== undefined)))
return Promise.all(records.map(processRecord))
.then((results) => cb(null, results.filter((r) => r !== undefined)))
.catch((err) => {
cb(JSON.stringify(err));
});
Expand Down
34 changes: 0 additions & 34 deletions packages/api/lambdas/queue.js

This file was deleted.

5 changes: 5 additions & 0 deletions packages/api/tests/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rules": {
"no-param-reassign": "off"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know off-hand why this is required? Seems like a good idea not to mutate parameters if we can avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah in the tests setting t.context throws eslint errors. This eslintrc file is just for the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should turn it off across the repo. Reassign params in java script is a very common trend.

}
}
Loading