-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CUMULUS-359 Add payload to one time rule created from kinesis stream #232
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update Changelog after current release is published
*/ | ||
async function createOneTimeRules(kinesisRules) { | ||
async function createOneTimeRules(kinesisRules, eventObject) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not scalable at its current form. We should update this function to only generate the cumulus message and pass it to the SF starter queue for each incoming message.
…he kinesis message
* create rules. | ||
* | ||
* @param {*} record - input to the kinesis stream | ||
* @returns {Promise} promise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - could be more specific
@returns {[Promises]} Array of promises. Each promise is resolved when a message is queued for all associated kinesis rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, thanks! Think I meant to come back to that one.
* @returns {(error|string)} Success message or error | ||
*/ | ||
function handler(event, context, cb) { | ||
const records = event.Records; | ||
|
||
return Promise.all(records.map(r => processRecord(r))) | ||
.then((results) => cb(null, results.filter(r => r !== undefined))) | ||
return Promise.all(records.map((r) => processRecord(r))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make this even terser - but it's a style preference:
return Promise.all(records.map(processRecord))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
return Promise.all(records.map(r => processRecord(r))) | ||
.then((results) => cb(null, results.filter(r => r !== undefined))) | ||
return Promise.all(records.map((r) => processRecord(r))) | ||
.then((results) => cb(null, results.filter((r) => r !== undefined))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another minor style thing, you can omit the parens around results when the function with one argument falls on a single line (following https://github.com/airbnb/javascript#arrows--one-arg-parens)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ESLint is complaining so I just left it.
@@ -1,34 +0,0 @@ | |||
'use strict'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding some dead code and removing it! 💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well Alireza confirmed it was dead and Marc kinda found it, so I can only take credit for the actual physical removal.
packages/ingest/queue.js
Outdated
message.payload = { pdr }; | ||
message.payload = { | ||
pdr | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why break this into 3 lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea.. don't remember touching that code! Fixed.
packages/ingest/queue.js
Outdated
if (message.resources) { | ||
queueUrl = message.resources.queues.startSF; | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else should be on the line above, e.g.:
} else {
https://github.com/airbnb/javascript#blocks--cuddled-elses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is removed anyway.
packages/ingest/queue.js
Outdated
|
||
let queueUrl = null; | ||
|
||
if (message.resources) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there documentation around when this is message.resources and when it's message.meta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was from when I ported it over and never removed it. So I removed that entire block.
]); | ||
results.Items.forEach(r => t.is(r.rule.type, 'onetime')); | ||
}); | ||
test.serial('it should create a onetime rule for each associated workflow', async (t) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the serial option here? I thought we were already running tests in serial
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put that in when debugging and now have removed it. You're right, nice catch.
}) | ||
); | ||
|
||
t.context.tableName = randomString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to use random strings and regenerate these for every test? Since these are constants it makes more sense to me to define them outside of the before block and I don't really see a reason for having them be randomly generated since we statically define them in our deployments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was having a lot of issues with concurrency it seemed like and had the hardest time getting the queueing tests to work. This seemed to help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - ya the concurrency in ava can be frustrating. I was hoping use --serial would get around these types of issues entirely. :/
* @param {Object} event - event to queue with workflow and payload info | ||
* @returns {Promise} - resolves when the message has been enqueued | ||
*/ | ||
async function queueWorkflowMessage(event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we had a test for this function specifically, but wouldn't want to block this PR for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope you're right and it really wasn't a big deal to add it actually. Thanks!
]); | ||
results.Items.forEach(r => t.is(r.rule.type, 'onetime')); | ||
}); | ||
test.serial('it should create a onetime rule for each associated workflow', async (t) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this test description needs updating, e.g.
test('it should enqueue a message for each associated workflow', async (t) => {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!!! Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made some minor comments but none of them are really blocking, let me know when you have taken a look. Perhaps also @scisco wants to have another look.
|
||
return await Promise.all(oneTimeRulePromises); | ||
return Rule.buildPayload(item) | ||
.then((payload) => queueWorkflowMessage(payload)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry missed this the first time around, this could be:
return Rule.buildPayload(item).then(queueWorkflowMessage);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection in the event argument. It | ||
* creates new onetime rules for each rule found to trigger the workflow defined in the 'kinesis'-type rule. | ||
* `handler` Looks up enabled 'kinesis'-type rules associated with the collection | ||
* in the event argument. It creates new onetime rules for each rule found to trigger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is out of date - It creates new onetime rules for each rule
-> It enqueues a message for each kinesis-type rule to trigger the associated workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, thank you
@@ -0,0 +1,5 @@ | |||
{ | |||
"rules": { | |||
"no-param-reassign": "off" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know off-hand why this is required? Seems like a good idea not to mutate parameters if we can avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah in the tests setting t.context throws eslint errors. This eslintrc file is just for the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should turn it off across the repo. Reassign params in java script is a very common trend.
|
||
const model = new Rule(t.context.tableName); | ||
await manager.createTable(t.context.tableName, ruleTableParams); | ||
await Promise.all([rule1Params, rule2Params, disabledRuleParams] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Promise.all([rule1Params, rule2Params, disabledRuleParams].map(model.create));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually get all kinds of errors when I do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh interesting - probably that shorthand only works for promises, not iterators. Sorry for the misleading comment.
]); | ||
}); | ||
|
||
test('the queue receives a correctly formatted workflow message', async (t) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
async function processRecord(record) { | ||
const dataBlob = record.kinesis.data; | ||
const dataString = Buffer.from(dataBlob, 'base64').toString(); | ||
const eventObject = JSON.parse(dataString); | ||
|
||
await validateMessage(eventObject) | ||
return validateMessage(eventObject) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we no longer need await
I don't think this function needs an async
declaration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a few more minor comments, non-blocking. Thanks @laurenfrederick !!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 💯
Summary: Added payload to the rule created for the kinesis message to pass the message through to the workflow
Addresses CUMULUS-359: Kinesis triggered workflows do not retain any of the triggering message's body
Changes
Test Plan
Things that should succeed before merging.
Reviewers: @abarciauskas-bgse @jennyhliu