-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CUMULUS-359 Add payload to one time rule created from kinesis stream #232
Changes from 11 commits
422ff3d
4750e3f
6601642
2b9f948
48d3399
9087e27
e1343a0
bab2cf0
7c55d28
5a5619a
186dab2
67652c1
9411a38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,17 +3,19 @@ | |
const Ajv = require('ajv'); | ||
|
||
const Rule = require('../models/rules'); | ||
const model = new Rule(); | ||
const messageSchema = require('./kinesis-consumer-event-schema.json'); | ||
const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); | ||
|
||
/** | ||
* `getKinesisRules` scans and returns DynamoDB rules table for enabled, 'kinesis'-type rules associated with the * collection declared in the event | ||
* `getKinesisRules` scans and returns DynamoDB rules table for enabled, | ||
* 'kinesis'-type rules associated with the * collection declared in the event | ||
* | ||
* @param {object} event lambda event | ||
* @returns {array} List of zero or more rules found from table scan | ||
* @param {Object} event - lambda event | ||
* @returns {Array} List of zero or more rules found from table scan | ||
*/ | ||
async function getKinesisRules(event) { | ||
const collection = event.collection; | ||
const model = new Rule(); | ||
const kinesisRules = await model.scan({ | ||
names: { | ||
'#col': 'collection', | ||
|
@@ -34,63 +36,74 @@ async function getKinesisRules(event) { | |
} | ||
|
||
/** | ||
* `createOneTimeRules` creates new rules with the same data as a kinesis-type rule, except the type is modified to 'onetime'. | ||
* Queue a workflow message for the kinesis rule with the message passed | ||
* to kinesis as the payload | ||
* | ||
* @param {array} kinesisRules list of rule objects | ||
* @returns {array} Array of promises for model.create | ||
* @param {Object} kinesisRule - kinesis rule to queue the message for | ||
* @param {Object} eventObject - message passed to kinesis | ||
* @returns {Promise} promise resolved when the message is queued | ||
*/ | ||
async function createOneTimeRules(kinesisRules) { | ||
const oneTimeRulePromises = kinesisRules.map((kinesisRule) => { | ||
const oneTimeRuleParams = Object.assign({}, kinesisRule); | ||
delete oneTimeRuleParams['createdAt']; | ||
delete oneTimeRuleParams['updatedAt']; | ||
oneTimeRuleParams.name = `${kinesisRule.name}_${Date.now().toString()}`; | ||
oneTimeRuleParams.rule.type = 'onetime'; | ||
return model.create(oneTimeRuleParams); | ||
}); | ||
async function queueMessageForRule(kinesisRule, eventObject) { | ||
const item = { | ||
workflow: kinesisRule.workflow, | ||
provider: kinesisRule.provider, | ||
collection: kinesisRule.collection, | ||
payload: eventObject | ||
}; | ||
|
||
return await Promise.all(oneTimeRulePromises); | ||
return Rule.buildPayload(item) | ||
.then((payload) => queueWorkflowMessage(payload)); | ||
} | ||
|
||
/** | ||
* `validateMessage` validates an event as being valid for creating a workflow. See the messageSchema defined at | ||
* the top of this file. | ||
* `validateMessage` validates an event as being valid for creating a workflow. | ||
* See the messageSchema defined at the top of this file. | ||
* | ||
* @param {object} event lambda event | ||
* @returns {(error|object)} Throws an Ajv.ValidationError if event object is invalid. Returns the event object if event is valid. | ||
* @param {Object} event - lambda event | ||
* @returns {(error|Object)} Throws an Ajv.ValidationError if event object is invalid. | ||
* Returns the event object if event is valid. | ||
*/ | ||
async function validateMessage(event) { | ||
const ajv = new Ajv({allErrors: true}); | ||
const ajv = new Ajv({ allErrors: true }); | ||
const validate = ajv.compile(messageSchema); | ||
return await validate(event); | ||
} | ||
|
||
/** | ||
* Process data sent to a kinesis stream. Validate the data and | ||
* create rules. | ||
* | ||
* @param {*} record - input to the kinesis stream | ||
* @returns {[Promises]} Array of promises. Each promise is resolved when a | ||
* message is queued for all associated kinesis rules. | ||
*/ | ||
async function processRecord(record) { | ||
const dataBlob = record.kinesis.data; | ||
const dataString = Buffer.from(dataBlob, 'base64').toString(); | ||
const eventObject = JSON.parse(dataString); | ||
|
||
await validateMessage(eventObject) | ||
return validateMessage(eventObject) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we no longer need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, thanks! |
||
.then(getKinesisRules) | ||
.then((kinesisRules) => { | ||
return createOneTimeRules(kinesisRules); | ||
}); | ||
.then((kinesisRules) => ( | ||
Promise.all(kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject))) | ||
)); | ||
} | ||
|
||
/** | ||
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection in the event argument. It | ||
* creates new onetime rules for each rule found to trigger the workflow defined in the 'kinesis'-type rule. | ||
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection | ||
* in the event argument. It creates new onetime rules for each rule found to trigger | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is out of date - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, thank you |
||
* the workflow defined in the 'kinesis'-type rule. | ||
* | ||
* @param {*} event lambda event | ||
* @param {*} context lambda context | ||
* @param {*} cb callback function to explicitly return information back to the caller. | ||
* @param {*} event - lambda event | ||
* @param {*} context - lambda context | ||
* @param {*} cb - callback function to explicitly return information back to the caller. | ||
* @returns {(error|string)} Success message or error | ||
*/ | ||
function handler(event, context, cb) { | ||
const records = event.Records; | ||
|
||
return Promise.all(records.map(r => processRecord(r))) | ||
.then((results) => cb(null, results.filter(r => r !== undefined))) | ||
return Promise.all(records.map(processRecord)) | ||
.then((results) => cb(null, results.filter((r) => r !== undefined))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another minor style thing, you can omit the parens around results when the function with one argument falls on a single line (following https://github.com/airbnb/javascript#arrows--one-arg-parens) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ESLint is complaining so I just left it. |
||
.catch((err) => { | ||
cb(JSON.stringify(err)); | ||
}); | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"rules": { | ||
"no-param-reassign": "off" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know off-hand why this is required? Seems like a good idea not to mutate parameters if we can avoid it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah in the tests setting t.context throws eslint errors. This eslintrc file is just for the tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should turn it off across the repo. Reassign params in java script is a very common trend. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry missed this the first time around, this could be:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks