From 422ff3d9ef1ddeb4a9b282cbe4775d69a43c4a43 Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Mon, 5 Mar 2018 15:33:37 -0500 Subject: [PATCH 1/9] CUMULUS-359 Add payload to one time rule created from kinesis stream --- packages/api/lambdas/kinesis-consumer.js | 53 ++++++++++------- packages/api/tests/test-kinesis-consumer.js | 66 ++++++++++++--------- 2 files changed, 71 insertions(+), 48 deletions(-) diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index 15f203c25b5..3d61478ab80 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -7,10 +7,11 @@ const model = new Rule(); const messageSchema = require('./kinesis-consumer-event-schema.json'); /** - * `getKinesisRules` scans and returns DynamoDB rules table for enabled, 'kinesis'-type rules associated with the * collection declared in the event + * `getKinesisRules` scans and returns DynamoDB rules table for enabled, + * 'kinesis'-type rules associated with the * collection declared in the event * - * @param {object} event lambda event - * @returns {array} List of zero or more rules found from table scan + * @param {Object} event - lambda event + * @returns {Array} List of zero or more rules found from table scan */ async function getKinesisRules(event) { const collection = event.collection; @@ -34,18 +35,21 @@ async function getKinesisRules(event) { } /** - * `createOneTimeRules` creates new rules with the same data as a kinesis-type rule, except the type is modified to 'onetime'. + * `createOneTimeRules` creates new rules with the same data as a kinesis-type rule, + * except the type is modified to 'onetime'. * - * @param {array} kinesisRules list of rule objects - * @returns {array} Array of promises for model.create + * @param {Array} kinesisRules - list of rule objects + * @param {Object} eventObject - kinesis message input + * @returns {Array} Array of promises for model.create */ -async function createOneTimeRules(kinesisRules) { +async function createOneTimeRules(kinesisRules, eventObject) { const oneTimeRulePromises = kinesisRules.map((kinesisRule) => { const oneTimeRuleParams = Object.assign({}, kinesisRule); delete oneTimeRuleParams['createdAt']; delete oneTimeRuleParams['updatedAt']; oneTimeRuleParams.name = `${kinesisRule.name}_${Date.now().toString()}`; oneTimeRuleParams.rule.type = 'onetime'; + oneTimeRuleParams.payload = eventObject; return model.create(oneTimeRuleParams); }); @@ -53,18 +57,26 @@ async function createOneTimeRules(kinesisRules) { } /** - * `validateMessage` validates an event as being valid for creating a workflow. See the messageSchema defined at - * the top of this file. + * `validateMessage` validates an event as being valid for creating a workflow. + * See the messageSchema defined at the top of this file. * - * @param {object} event lambda event - * @returns {(error|object)} Throws an Ajv.ValidationError if event object is invalid. Returns the event object if event is valid. + * @param {Object} event - lambda event + * @returns {(error|Object)} Throws an Ajv.ValidationError if event object is invalid. + * Returns the event object if event is valid. */ async function validateMessage(event) { - const ajv = new Ajv({allErrors: true}); + const ajv = new Ajv({ allErrors: true }); const validate = ajv.compile(messageSchema); return await validate(event); } +/** + * Process data sent to a kinesis stream. Validate the data and + * create rules. + * + * @param {*} record - input to the kinesis stream + * @returns {Promise} promise + */ async function processRecord(record) { const dataBlob = record.kinesis.data; const dataString = Buffer.from(dataBlob, 'base64').toString(); @@ -73,24 +85,25 @@ async function processRecord(record) { await validateMessage(eventObject) .then(getKinesisRules) .then((kinesisRules) => { - return createOneTimeRules(kinesisRules); + return createOneTimeRules(kinesisRules, eventObject); }); } /** - * `handler` Looks up enabled 'kinesis'-type rules associated with the collection in the event argument. It - * creates new onetime rules for each rule found to trigger the workflow defined in the 'kinesis'-type rule. + * `handler` Looks up enabled 'kinesis'-type rules associated with the collection + * in the event argument. It creates new onetime rules for each rule found to trigger + * the workflow defined in the 'kinesis'-type rule. * - * @param {*} event lambda event - * @param {*} context lambda context - * @param {*} cb callback function to explicitly return information back to the caller. + * @param {*} event - lambda event + * @param {*} context - lambda context + * @param {*} cb - callback function to explicitly return information back to the caller. * @returns {(error|string)} Success message or error */ function handler(event, context, cb) { const records = event.Records; - return Promise.all(records.map(r => processRecord(r))) - .then((results) => cb(null, results.filter(r => r !== undefined))) + return Promise.all(records.map((r) => processRecord(r))) + .then((results) => cb(null, results.filter((r) => r !== undefined))) .catch((err) => { cb(JSON.stringify(err)); }); diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index 254c5e7363b..c7493ea6500 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -26,8 +26,8 @@ const eventData = JSON.stringify({ const event = { Records: [ - {kinesis: {data: new Buffer(eventData).toString('base64')}}, - {kinesis: {data: new Buffer(eventData).toString('base64')}} + { kinesis: { data: new Buffer(eventData).toString('base64') } }, + { kinesis: { data: new Buffer(eventData).toString('base64') } } ] }; @@ -59,10 +59,17 @@ const disabledRuleParams = Object.assign({}, commonRuleParams, { state: 'DISABLED' }); +/** + * Callback used for testing + * + * @param {*} err - error + * @param {Object} object - object + * @returns {Object} object, if no error is thrown + */ function testCallback(err, object) { if (err) throw err; return object; -}; +} test.before(async () => { sinon.stub(Rule, 'buildPayload').resolves(true); @@ -74,17 +81,17 @@ test.after.always(async () => { }); // getKinesisRule tests -test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', t => { - return Promise.all([rule1Params, rule2Params, disabledRuleParams].map(x => model.create(x))) - .then(() => { - return getKinesisRules(JSON.parse(eventData)) - }).then((result) => { +// eslint-disable-next-line max-len +test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => + Promise.all([rule1Params, rule2Params, disabledRuleParams].map((x) => model.create(x))) + .then(() => getKinesisRules(JSON.parse(eventData))) + .then((result) => { t.is(result.length, 2); - }); -}); + }) +); // handler tests -test('it should create a onetime rule for each associated workflow', async t => { +test('it should create a onetime rule for each associated workflow', async (t) => { await handler(event, {}, testCallback).then(() => { return model.scan({ names: { @@ -104,23 +111,26 @@ test('it should create a onetime rule for each associated workflow', async t => }) .then((results) => { t.is(results.Items.length, 4); - const workflowNames = results.Items.map(i => i.workflow).sort(); + + const workflowNames = results.Items.map((i) => i.workflow).sort(); t.deepEqual(workflowNames, [ 'test-workflow-1', 'test-workflow-1', 'test-workflow-2', 'test-workflow-2' ]); - results.Items.forEach(r => t.is(r.rule.type, 'onetime')); - }); + results.Items.forEach((r) => t.is(r.rule.type, 'onetime')); + + results.Items.forEach((r) => t.deepEqual({ collection: 'test-collection' }, r.payload)); + }); }); -test('it should throw an error if message does not include a collection', t => { +test('it should throw an error if message does not include a collection', (t) => { const invalidMessage = JSON.stringify({}); - const event = { - Records: [{kinesis: {data: new Buffer(invalidMessage).toString('base64')}}] + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] }; - return handler(event, {}, testCallback) + return handler(kinesisEvent, {}, testCallback) .catch((err) => { const errObject = JSON.parse(err); t.is(errObject.errors[0].dataPath, ''); @@ -128,12 +138,12 @@ test('it should throw an error if message does not include a collection', t => { }); }); -test('it should throw an error if message collection has wrong data type', t => { - const invalidMessage = JSON.stringify({collection: {}}); - const event = { - Records: [{kinesis: {data: new Buffer(invalidMessage).toString('base64')}}] +test('it should throw an error if message collection has wrong data type', (t) => { + const invalidMessage = JSON.stringify({ collection: {} }); + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] }; - return handler(event, {}, testCallback) + return handler(kinesisEvent, {}, testCallback) .catch((err) => { const errObject = JSON.parse(err); t.is(errObject.errors[0].dataPath, '.collection'); @@ -141,10 +151,10 @@ test('it should throw an error if message collection has wrong data type', t => }); }); -test('it should not throw if message is valid', t => { - const validMessage = JSON.stringify({collection: 'confection-collection'}); - const event = { - Records: [{kinesis: {data: new Buffer(validMessage).toString('base64')}}] +test('it should not throw if message is valid', (t) => { + const validMessage = JSON.stringify({ collection: 'confection-collection' }); + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] }; - return handler(event, {}, testCallback).then(r => t.deepEqual(r, [])); + return handler(kinesisEvent, {}, testCallback).then((r) => t.deepEqual(r, [])); }); From 2b9f9482107df1cc9a766264806eef164041263a Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Mon, 5 Mar 2018 16:03:51 -0500 Subject: [PATCH 2/9] CUMULUS-359 Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 135046f382a..42509a110b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Fixed +- Kinesis message content is passed to the triggered workflow [CUMULUS-359] + ## [v1.1.0] - 2018-03-05 ### Added From 9087e276825e99d1234cb5db76399a9a27ee2953 Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Wed, 7 Mar 2018 09:53:57 -0500 Subject: [PATCH 3/9] CUMULUS-359 Queue a workflow message instead of creating a rule for the kinesis message --- packages/api/lambdas/kinesis-consumer.js | 39 +++-- packages/api/lambdas/queue.js | 39 ++--- packages/api/tests/test-kinesis-consumer.js | 159 ++++++++++++-------- packages/ingest/queue.js | 61 +++++++- 4 files changed, 179 insertions(+), 119 deletions(-) diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index 3d61478ab80..338b954bfd9 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -5,6 +5,7 @@ const Ajv = require('ajv'); const Rule = require('../models/rules'); const model = new Rule(); const messageSchema = require('./kinesis-consumer-event-schema.json'); +const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); /** * `getKinesisRules` scans and returns DynamoDB rules table for enabled, @@ -35,25 +36,23 @@ async function getKinesisRules(event) { } /** - * `createOneTimeRules` creates new rules with the same data as a kinesis-type rule, - * except the type is modified to 'onetime'. - * - * @param {Array} kinesisRules - list of rule objects - * @param {Object} eventObject - kinesis message input - * @returns {Array} Array of promises for model.create + * Queue a workflow message for the kinesis rule with the message passed + * to kinesis as the payload + * + * @param {*} kinesisRule - kinesis rule to queue the message for + * @param {*} eventObject - message passed to kinesis + * @param {Promise} - promise resolved when the message is queued */ -async function createOneTimeRules(kinesisRules, eventObject) { - const oneTimeRulePromises = kinesisRules.map((kinesisRule) => { - const oneTimeRuleParams = Object.assign({}, kinesisRule); - delete oneTimeRuleParams['createdAt']; - delete oneTimeRuleParams['updatedAt']; - oneTimeRuleParams.name = `${kinesisRule.name}_${Date.now().toString()}`; - oneTimeRuleParams.rule.type = 'onetime'; - oneTimeRuleParams.payload = eventObject; - return model.create(oneTimeRuleParams); - }); +async function queueMessageForRule(kinesisRule, eventObject) { + const item = { + workflow: kinesisRule.workflow, + provider: kinesisRule.provider, + collection: kinesisRule.collection, + payload: eventObject + }; - return await Promise.all(oneTimeRulePromises); + return Rule.buildPayload(item) + .then((payload) => queueWorkflowMessage(payload)); } /** @@ -84,9 +83,9 @@ async function processRecord(record) { await validateMessage(eventObject) .then(getKinesisRules) - .then((kinesisRules) => { - return createOneTimeRules(kinesisRules, eventObject); - }); + .then((kinesisRules) => ( + kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject)) + )); } /** diff --git a/packages/api/lambdas/queue.js b/packages/api/lambdas/queue.js index f8dc5c373f5..c8834a52725 100644 --- a/packages/api/lambdas/queue.js +++ b/packages/api/lambdas/queue.js @@ -1,34 +1,17 @@ 'use strict'; -const get = require('lodash.get'); -const aws = require('@cumulus/ingest/aws'); -const uuidv4 = require('uuid/v4'); +const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); +/** + * Handler for queue lambda task + * + * @param {*} event + * @param {*} context + * @param {*} cb - callback + */ function handler(event, context, cb) { - const template = get(event, 'template'); - const provider = get(event, 'provider', {}); - const meta = get(event, 'meta', {}); - const collection = get(event, 'collection', {}); - const payload = get(event, 'payload', {}); - - const parsed = aws.S3.parseS3Uri(template); - aws.S3.get(parsed.Bucket, parsed.Key).then((data) => { - const message = JSON.parse(data.Body); - message.provider = provider; - message.meta = meta; - message.payload = payload; - message.cumulus_meta.execution_name = uuidv4(); - - if (collection) { - message.collection = { - id: collection.name, - meta: collection - }; - } - - return aws.SQS.sendMessage(message.resources.queues.startSF, message) - .then((r) => cb(null, r)); - }) - .catch(cb); + return queueWorkflowMessage(event) + .then((r) => cb(null, r)) + .catch(cb); } module.exports = handler; diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index c7493ea6500..120629651c6 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -2,6 +2,9 @@ const test = require('ava'); const sinon = require('sinon'); +const get = require('lodash.get'); +const { sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); +const { createQueue, randomString } = require('@cumulus/common/test-utils'); const tableName = 'rule'; process.env.RulesTable = tableName; @@ -9,6 +12,7 @@ process.env.stackName = 'test-stack'; process.env.bucket = 'test-bucket'; process.env.kinesisConsumer = 'test-kinesisConsumer'; const { getKinesisRules, handler } = require('../lambdas/kinesis-consumer'); +const queue = require('../lambdas/queue'); const manager = require('../models/base'); const Rule = require('../models/rules'); const model = new Rule(); @@ -36,6 +40,7 @@ const commonRuleParams = { name: testCollectionName, version: '0.0.0' }, + provider: 'PROV1', rule: { type: 'kinesis', value: 'test-kinesisarn' @@ -67,19 +72,63 @@ const disabledRuleParams = Object.assign({}, commonRuleParams, { * @returns {Object} object, if no error is thrown */ function testCallback(err, object) { - if (err) throw err; + if (err) { + console.log('ERROR: ' + err); + throw err; + } return object; } test.before(async () => { - sinon.stub(Rule, 'buildPayload').resolves(true); await manager.createTable(tableName, ruleTableParams); }); +test.beforeEach(async (t) => { + t.context.templateBucket = randomString(); + await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); + + t.context.stateMachineArn = randomString(); + + t.context.queueUrl = await createQueue(); + + console.log(t.context.queueUrl); + + t.context.messageTemplate = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { queues: { startSF: t.context.queueUrl } } + }; + const messageTemplateKey = `${randomString()}/template.json`; + await s3().putObject({ + Bucket: t.context.templateBucket, + Key: messageTemplateKey, + Body: JSON.stringify(t.context.messageTemplate) + }).promise(); + + sinon.stub(Rule, 'buildPayload').callsFake(async (item) => { + return { + template: `s3://${t.context.templateBucket}/${messageTemplateKey}`, + provider: item.provider, + collection: item.collection, + meta: get(item, 'meta', {}), + payload: get(item, 'payload', {}) + }; + }); +}); + test.after.always(async () => { await manager.deleteTable(tableName); }); +test.afterEach(async (t) => { + await Promise.all([ + recursivelyDeleteS3Bucket(t.context.templateBucket), + sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() + ]); + sinon.restore(Rule.buildPayload); +}); + // getKinesisRule tests // eslint-disable-next-line max-len test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => @@ -92,69 +141,49 @@ test('it should look up kinesis-type rules which are associated with the collect // handler tests test('it should create a onetime rule for each associated workflow', async (t) => { - await handler(event, {}, testCallback).then(() => { - return model.scan({ - names: { - '#col': 'collection', - '#nm': 'name', - '#st': 'state', - '#rl': 'rule', - '#tp': 'type' - }, - filter: '#st = :enabledState AND #col.#nm = :collectionName AND #rl.#tp = :ruleType', - values: { - ':enabledState': 'ENABLED', - ':collectionName': testCollectionName, - ':ruleType': 'onetime' - } + await handler(event, {}, testCallback); + await getKinesisRules(JSON.parse(eventData)); + await sqs().receiveMessage({ + QueueUrl: t.context.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise() + .then((receiveMessageResponse) => { + console.log(receiveMessageResponse); + t.is(receiveMessageResponse.Messages.length, 4); }); - }) - .then((results) => { - t.is(results.Items.length, 4); - - const workflowNames = results.Items.map((i) => i.workflow).sort(); - t.deepEqual(workflowNames, [ - 'test-workflow-1', - 'test-workflow-1', - 'test-workflow-2', - 'test-workflow-2' - ]); - results.Items.forEach((r) => t.is(r.rule.type, 'onetime')); - - results.Items.forEach((r) => t.deepEqual({ collection: 'test-collection' }, r.payload)); - }); }); -test('it should throw an error if message does not include a collection', (t) => { - const invalidMessage = JSON.stringify({}); - const kinesisEvent = { - Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] - }; - return handler(kinesisEvent, {}, testCallback) - .catch((err) => { - const errObject = JSON.parse(err); - t.is(errObject.errors[0].dataPath, ''); - t.is(errObject.errors[0].message, 'should have required property \'collection\''); - }); -}); - -test('it should throw an error if message collection has wrong data type', (t) => { - const invalidMessage = JSON.stringify({ collection: {} }); - const kinesisEvent = { - Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] - }; - return handler(kinesisEvent, {}, testCallback) - .catch((err) => { - const errObject = JSON.parse(err); - t.is(errObject.errors[0].dataPath, '.collection'); - t.is(errObject.errors[0].message, 'should be string'); - }); -}); - -test('it should not throw if message is valid', (t) => { - const validMessage = JSON.stringify({ collection: 'confection-collection' }); - const kinesisEvent = { - Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] - }; - return handler(kinesisEvent, {}, testCallback).then((r) => t.deepEqual(r, [])); -}); +// test('it should throw an error if message does not include a collection', (t) => { +// const invalidMessage = JSON.stringify({}); +// const kinesisEvent = { +// Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] +// }; +// return handler(kinesisEvent, {}, testCallback) +// .catch((err) => { +// const errObject = JSON.parse(err); +// t.is(errObject.errors[0].dataPath, ''); +// t.is(errObject.errors[0].message, 'should have required property \'collection\''); +// }); +// }); + +// test('it should throw an error if message collection has wrong data type', (t) => { +// const invalidMessage = JSON.stringify({ collection: {} }); +// const kinesisEvent = { +// Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] +// }; +// return handler(kinesisEvent, {}, testCallback) +// .catch((err) => { +// const errObject = JSON.parse(err); +// t.is(errObject.errors[0].dataPath, '.collection'); +// t.is(errObject.errors[0].message, 'should be string'); +// }); +// }); + +// test('it should not throw if message is valid', (t) => { +// const validMessage = JSON.stringify({ collection: 'confection-collection' }); +// const kinesisEvent = { +// Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] +// }; +// return handler(kinesisEvent, {}, testCallback).then((r) => t.deepEqual(r, [])); +// }); diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 7990c27120f..1666d51191f 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -5,13 +5,15 @@ const { sendSQSMessage, parseS3Uri } = require('@cumulus/common/aws'); +const get = require('lodash.get'); +const uuidv4 = require('uuid/v4'); /** - * Create a message from a template stored on S3 - * - * @param {string} templateUri - S3 uri to the workflow template - * @returns {Promise} message object - **/ + * Create a message from a template stored on S3 + * + * @param {string} templateUri - S3 uri to the workflow template + * @returns {Promise} message object + **/ async function getMessageFromTemplate(templateUri) { const parsedS3Uri = parseS3Uri(templateUri); const data = await getS3Object(parsedS3Uri.Bucket, parsedS3Uri.Key); @@ -41,7 +43,9 @@ async function enqueueParsePdrMessage( message.meta.provider = provider; message.meta.collection = collection; - message.payload = { pdr }; + message.payload = { + pdr + }; return sendSQSMessage(queueUrl, message); } @@ -85,3 +89,48 @@ async function enqueueGranuleIngestMessage( return sendSQSMessage(queueUrl, message); } exports.enqueueGranuleIngestMessage = enqueueGranuleIngestMessage; + +/** + * Queue a workflow to be picked up by SF starter + * + * @param {*} event - event to queue with workflow and payload info + */ +async function queueWorkflowMessage(event) { + const template = get(event, 'template'); + const provider = get(event, 'provider', {}); + const collection = get(event, 'collection', {}); + const payload = get(event, 'payload', {}); + + const message = await getMessageFromTemplate(template); + + let queueUrl = null; + + if (message.resources) { + queueUrl = message.resources.queues.startSF; + } + else { + queueUrl = message.meta.queues.startSF; + } + + message.provider = provider; + message.payload = payload; + message.cumulus_meta.execution_name = uuidv4(); + + if (collection) { + message.collection = { + id: collection.name, + meta: collection + }; + } + + console.log('Message2: ' + JSON.stringify(message)); + + // const x = sendSQSMessage(message.resources.queues.startSF, message); + + // console.log('\n\nQueue message ' + x); + + // return x; + + return sendSQSMessage(queueUrl, message); +} +exports.queueWorkflowMessage = queueWorkflowMessage; From bab2cf00fa0d6b2c567a97f5a49dd41be79dacf8 Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Wed, 7 Mar 2018 10:40:37 -0500 Subject: [PATCH 4/9] CUMULUS-359 debugging --- packages/api/lambdas/kinesis-consumer.js | 7 ++- packages/api/tests/test-kinesis-consumer.js | 61 ++++++++++----------- 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index 338b954bfd9..9e1f8077e98 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -83,9 +83,10 @@ async function processRecord(record) { await validateMessage(eventObject) .then(getKinesisRules) - .then((kinesisRules) => ( - kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject)) - )); + .then((kinesisRules) => { + console.log(kinesisRules); + kinesisRules.map(async (kinesisRule) => await queueMessageForRule(kinesisRule, eventObject)); + }); } /** diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index 120629651c6..8df06cf983d 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -6,16 +6,10 @@ const get = require('lodash.get'); const { sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); const { createQueue, randomString } = require('@cumulus/common/test-utils'); -const tableName = 'rule'; -process.env.RulesTable = tableName; -process.env.stackName = 'test-stack'; -process.env.bucket = 'test-bucket'; -process.env.kinesisConsumer = 'test-kinesisConsumer'; const { getKinesisRules, handler } = require('../lambdas/kinesis-consumer'); const queue = require('../lambdas/queue'); const manager = require('../models/base'); const Rule = require('../models/rules'); -const model = new Rule(); const testCollectionName = 'test-collection'; const ruleTableParams = { @@ -79,10 +73,6 @@ function testCallback(err, object) { return object; } -test.before(async () => { - await manager.createTable(tableName, ruleTableParams); -}); - test.beforeEach(async (t) => { t.context.templateBucket = randomString(); await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); @@ -106,7 +96,7 @@ test.beforeEach(async (t) => { Body: JSON.stringify(t.context.messageTemplate) }).promise(); - sinon.stub(Rule, 'buildPayload').callsFake(async (item) => { + sinon.stub(Rule, 'buildPayload').callsFake((item) => { return { template: `s3://${t.context.templateBucket}/${messageTemplateKey}`, provider: item.provider, @@ -115,45 +105,50 @@ test.beforeEach(async (t) => { payload: get(item, 'payload', {}) }; }); -}); -test.after.always(async () => { - await manager.deleteTable(tableName); + t.context.tableName = randomString(); + process.env.RulesTable = t.context.tableName; + process.env.stackName = randomString(); + process.env.bucket = randomString(); + process.env.kinesisConsumer = randomString(); + + const model = new Rule(t.context.tableName); + await manager.createTable(t.context.tableName, ruleTableParams); + await Promise.all([rule1Params, rule2Params, disabledRuleParams].map((rule) => model.create(rule))); }); test.afterEach(async (t) => { await Promise.all([ recursivelyDeleteS3Bucket(t.context.templateBucket), - sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() + sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise(), + manager.deleteTable(t.context.tableName) ]); sinon.restore(Rule.buildPayload); }); // getKinesisRule tests // eslint-disable-next-line max-len -test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => - Promise.all([rule1Params, rule2Params, disabledRuleParams].map((x) => model.create(x))) - .then(() => getKinesisRules(JSON.parse(eventData))) +test.serial('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => { + getKinesisRules(JSON.parse(eventData)) .then((result) => { t.is(result.length, 2); - }) -); - -// handler tests -test('it should create a onetime rule for each associated workflow', async (t) => { - await handler(event, {}, testCallback); - await getKinesisRules(JSON.parse(eventData)); - await sqs().receiveMessage({ - QueueUrl: t.context.queueUrl, - MaxNumberOfMessages: 10, - WaitTimeSeconds: 1 - }).promise() - .then((receiveMessageResponse) => { - console.log(receiveMessageResponse); - t.is(receiveMessageResponse.Messages.length, 4); }); }); +// handler tests +// test.serial('it should create a onetime rule for each associated workflow', async (t) => { +// await handler(event, {}, testCallback); +// await sqs().receiveMessage({ +// QueueUrl: t.context.queueUrl, +// MaxNumberOfMessages: 10, +// WaitTimeSeconds: 1 +// }).promise() +// .then((receiveMessageResponse) => { +// console.log(receiveMessageResponse); +// t.is(receiveMessageResponse.Messages.length, 4); +// }); +// }); + // test('it should throw an error if message does not include a collection', (t) => { // const invalidMessage = JSON.stringify({}); // const kinesisEvent = { From 7c55d28bc4530ce8534da9c72ac7dadc92fcd2fd Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Wed, 7 Mar 2018 11:38:12 -0500 Subject: [PATCH 5/9] CUMULUS-359 Fix tests and some cleanup --- packages/api/index.js | 1 - packages/api/lambdas/kinesis-consumer.js | 11 +- packages/api/lambdas/queue.js | 17 --- packages/api/tests/test-kinesis-consumer.js | 111 ++++++++++---------- packages/ingest/queue.js | 8 -- 5 files changed, 61 insertions(+), 87 deletions(-) delete mode 100644 packages/api/lambdas/queue.js diff --git a/packages/api/index.js b/packages/api/index.js index 3a5ab6fcbed..5ce8e4579e8 100644 --- a/packages/api/index.js +++ b/packages/api/index.js @@ -19,7 +19,6 @@ exports.jobs = require('./lambdas/jobs'); exports.bootstrap = require('./lambdas/bootstrap').handler; exports.scheduler = require('./lambdas/sf-scheduler'); exports.starter = require('./lambdas/sf-starter'); -exports.queue = require('./lambdas/queue'); exports.kinesisConsumer = require('./lambdas/kinesis-consumer').handler; const indexer = require('./es/indexer'); diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index 9e1f8077e98..f4c16bec5df 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -3,7 +3,6 @@ const Ajv = require('ajv'); const Rule = require('../models/rules'); -const model = new Rule(); const messageSchema = require('./kinesis-consumer-event-schema.json'); const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); @@ -16,6 +15,7 @@ const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); */ async function getKinesisRules(event) { const collection = event.collection; + const model = new Rule(); const kinesisRules = await model.scan({ names: { '#col': 'collection', @@ -81,12 +81,11 @@ async function processRecord(record) { const dataString = Buffer.from(dataBlob, 'base64').toString(); const eventObject = JSON.parse(dataString); - await validateMessage(eventObject) + return validateMessage(eventObject) .then(getKinesisRules) - .then((kinesisRules) => { - console.log(kinesisRules); - kinesisRules.map(async (kinesisRule) => await queueMessageForRule(kinesisRule, eventObject)); - }); + .then((kinesisRules) => ( + Promise.all(kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject))) + )); } /** diff --git a/packages/api/lambdas/queue.js b/packages/api/lambdas/queue.js deleted file mode 100644 index c8834a52725..00000000000 --- a/packages/api/lambdas/queue.js +++ /dev/null @@ -1,17 +0,0 @@ -'use strict'; - -const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); - -/** - * Handler for queue lambda task - * - * @param {*} event - * @param {*} context - * @param {*} cb - callback - */ -function handler(event, context, cb) { - return queueWorkflowMessage(event) - .then((r) => cb(null, r)) - .catch(cb); -} -module.exports = handler; diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index 8df06cf983d..ea096c20f7e 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -81,8 +81,6 @@ test.beforeEach(async (t) => { t.context.queueUrl = await createQueue(); - console.log(t.context.queueUrl); - t.context.messageTemplate = { cumulus_meta: { state_machine: t.context.stateMachineArn @@ -96,15 +94,15 @@ test.beforeEach(async (t) => { Body: JSON.stringify(t.context.messageTemplate) }).promise(); - sinon.stub(Rule, 'buildPayload').callsFake((item) => { - return { + sinon.stub(Rule, 'buildPayload').callsFake((item) => + Promise.resolve({ template: `s3://${t.context.templateBucket}/${messageTemplateKey}`, provider: item.provider, collection: item.collection, meta: get(item, 'meta', {}), payload: get(item, 'payload', {}) - }; - }); + }) + ); t.context.tableName = randomString(); process.env.RulesTable = t.context.tableName; @@ -114,7 +112,8 @@ test.beforeEach(async (t) => { const model = new Rule(t.context.tableName); await manager.createTable(t.context.tableName, ruleTableParams); - await Promise.all([rule1Params, rule2Params, disabledRuleParams].map((rule) => model.create(rule))); + await Promise.all([rule1Params, rule2Params, disabledRuleParams] + .map((rule) => model.create(rule))); }); test.afterEach(async (t) => { @@ -123,62 +122,64 @@ test.afterEach(async (t) => { sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise(), manager.deleteTable(t.context.tableName) ]); - sinon.restore(Rule.buildPayload); + Rule.buildPayload.restore(); }); // getKinesisRule tests // eslint-disable-next-line max-len test.serial('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => { - getKinesisRules(JSON.parse(eventData)) + return getKinesisRules(JSON.parse(eventData)) .then((result) => { t.is(result.length, 2); }); }); // handler tests -// test.serial('it should create a onetime rule for each associated workflow', async (t) => { -// await handler(event, {}, testCallback); -// await sqs().receiveMessage({ -// QueueUrl: t.context.queueUrl, -// MaxNumberOfMessages: 10, -// WaitTimeSeconds: 1 -// }).promise() -// .then((receiveMessageResponse) => { -// console.log(receiveMessageResponse); -// t.is(receiveMessageResponse.Messages.length, 4); -// }); -// }); - -// test('it should throw an error if message does not include a collection', (t) => { -// const invalidMessage = JSON.stringify({}); -// const kinesisEvent = { -// Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] -// }; -// return handler(kinesisEvent, {}, testCallback) -// .catch((err) => { -// const errObject = JSON.parse(err); -// t.is(errObject.errors[0].dataPath, ''); -// t.is(errObject.errors[0].message, 'should have required property \'collection\''); -// }); -// }); - -// test('it should throw an error if message collection has wrong data type', (t) => { -// const invalidMessage = JSON.stringify({ collection: {} }); -// const kinesisEvent = { -// Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] -// }; -// return handler(kinesisEvent, {}, testCallback) -// .catch((err) => { -// const errObject = JSON.parse(err); -// t.is(errObject.errors[0].dataPath, '.collection'); -// t.is(errObject.errors[0].message, 'should be string'); -// }); -// }); - -// test('it should not throw if message is valid', (t) => { -// const validMessage = JSON.stringify({ collection: 'confection-collection' }); -// const kinesisEvent = { -// Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] -// }; -// return handler(kinesisEvent, {}, testCallback).then((r) => t.deepEqual(r, [])); -// }); +test.serial('it should create a onetime rule for each associated workflow', async (t) => { + await handler(event, {}, testCallback); + await sqs().receiveMessage({ + QueueUrl: t.context.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise() + .then((receiveMessageResponse) => { + t.is(receiveMessageResponse.Messages.length, 4); + receiveMessageResponse.Messages.map((message) => ( + t.is(JSON.stringify(JSON.parse(message.Body).payload), JSON.stringify({ collection: 'test-collection' })) + )); + }); +}); + +test.serial('it should throw an error if message does not include a collection', (t) => { + const invalidMessage = JSON.stringify({}); + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] + }; + return handler(kinesisEvent, {}, testCallback) + .catch((err) => { + const errObject = JSON.parse(err); + t.is(errObject.errors[0].dataPath, ''); + t.is(errObject.errors[0].message, 'should have required property \'collection\''); + }); +}); + +test.serial('it should throw an error if message collection has wrong data type', (t) => { + const invalidMessage = JSON.stringify({ collection: {} }); + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] + }; + return handler(kinesisEvent, {}, testCallback) + .catch((err) => { + const errObject = JSON.parse(err); + t.is(errObject.errors[0].dataPath, '.collection'); + t.is(errObject.errors[0].message, 'should be string'); + }); +}); + +test.serial('it should not throw if message is valid', (t) => { + const validMessage = JSON.stringify({ collection: 'confection-collection' }); + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] + }; + return handler(kinesisEvent, {}, testCallback).then((r) => t.deepEqual(r, [[]])); +}); diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 1666d51191f..5e2b85107be 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -123,14 +123,6 @@ async function queueWorkflowMessage(event) { }; } - console.log('Message2: ' + JSON.stringify(message)); - - // const x = sendSQSMessage(message.resources.queues.startSF, message); - - // console.log('\n\nQueue message ' + x); - - // return x; - return sendSQSMessage(queueUrl, message); } exports.queueWorkflowMessage = queueWorkflowMessage; From 5a5619a32ec4901848a61a738d9d0c46044f189e Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Wed, 7 Mar 2018 12:18:32 -0500 Subject: [PATCH 6/9] CUMULUS-359 Cleanup --- CHANGELOG.md | 4 ++++ packages/api/lambdas/kinesis-consumer.js | 10 +++++----- packages/api/tests/.eslintrc.json | 5 +++++ packages/api/tests/test-kinesis-consumer.js | 6 +----- packages/ingest/queue.js | 5 +++-- 5 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 packages/api/tests/.eslintrc.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 42509a110b0..7fb0610e5bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Removed +- Unused queue lambda in api/lambdas [CUMULUS-359] + ### Fixed - Kinesis message content is passed to the triggered workflow [CUMULUS-359] +- Kinesis message queues a workflow message and does not write to rules table [CUMULUS-359] ## [v1.1.0] - 2018-03-05 diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index f4c16bec5df..9b25938c00a 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -37,11 +37,11 @@ async function getKinesisRules(event) { /** * Queue a workflow message for the kinesis rule with the message passed - * to kinesis as the payload - * - * @param {*} kinesisRule - kinesis rule to queue the message for - * @param {*} eventObject - message passed to kinesis - * @param {Promise} - promise resolved when the message is queued + * to kinesis as the payload + * + * @param {Object} kinesisRule - kinesis rule to queue the message for + * @param {Object} eventObject - message passed to kinesis + * @returns {Promise} promise resolved when the message is queued */ async function queueMessageForRule(kinesisRule, eventObject) { const item = { diff --git a/packages/api/tests/.eslintrc.json b/packages/api/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/packages/api/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index ea096c20f7e..71a32b3fd6f 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -7,7 +7,6 @@ const { sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); const { createQueue, randomString } = require('@cumulus/common/test-utils'); const { getKinesisRules, handler } = require('../lambdas/kinesis-consumer'); -const queue = require('../lambdas/queue'); const manager = require('../models/base'); const Rule = require('../models/rules'); const testCollectionName = 'test-collection'; @@ -66,10 +65,7 @@ const disabledRuleParams = Object.assign({}, commonRuleParams, { * @returns {Object} object, if no error is thrown */ function testCallback(err, object) { - if (err) { - console.log('ERROR: ' + err); - throw err; - } + if (err) throw err; return object; } diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 5e2b85107be..48970ce1bea 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -92,8 +92,9 @@ exports.enqueueGranuleIngestMessage = enqueueGranuleIngestMessage; /** * Queue a workflow to be picked up by SF starter - * - * @param {*} event - event to queue with workflow and payload info + * + * @param {Object} event - event to queue with workflow and payload info + * @returns {Promise} - resolves when the message has been enqueued */ async function queueWorkflowMessage(event) { const template = get(event, 'template'); From 186dab2a18aaf397c8b1ef5a190276b7f61a11b6 Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Thu, 8 Mar 2018 07:30:39 -0500 Subject: [PATCH 7/9] CUMULUS-359 PR comments, test for ingest queue --- packages/api/lambdas/kinesis-consumer.js | 5 +- packages/api/tests/test-kinesis-consumer.js | 10 ++-- packages/ingest/queue.js | 20 +------ packages/ingest/test/.eslintrc.json | 3 +- packages/ingest/test/queue.js | 63 +++++++++++++++++++++ 5 files changed, 76 insertions(+), 25 deletions(-) create mode 100644 packages/ingest/test/queue.js diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index 9b25938c00a..76edf7cac99 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -74,7 +74,8 @@ async function validateMessage(event) { * create rules. * * @param {*} record - input to the kinesis stream - * @returns {Promise} promise + * @returns {[Promises]} Array of promises. Each promise is resolved when a + * message is queued for all associated kinesis rules. */ async function processRecord(record) { const dataBlob = record.kinesis.data; @@ -101,7 +102,7 @@ async function processRecord(record) { function handler(event, context, cb) { const records = event.Records; - return Promise.all(records.map((r) => processRecord(r))) + return Promise.all(records.map(processRecord)) .then((results) => cb(null, results.filter((r) => r !== undefined))) .catch((err) => { cb(JSON.stringify(err)); diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index 71a32b3fd6f..9418bd50e97 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -123,7 +123,7 @@ test.afterEach(async (t) => { // getKinesisRule tests // eslint-disable-next-line max-len -test.serial('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => { +test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => { return getKinesisRules(JSON.parse(eventData)) .then((result) => { t.is(result.length, 2); @@ -131,7 +131,7 @@ test.serial('it should look up kinesis-type rules which are associated with the }); // handler tests -test.serial('it should create a onetime rule for each associated workflow', async (t) => { +test('it should enqueue a message for each associated workflow', async (t) => { await handler(event, {}, testCallback); await sqs().receiveMessage({ QueueUrl: t.context.queueUrl, @@ -146,7 +146,7 @@ test.serial('it should create a onetime rule for each associated workflow', asyn }); }); -test.serial('it should throw an error if message does not include a collection', (t) => { +test('it should throw an error if message does not include a collection', (t) => { const invalidMessage = JSON.stringify({}); const kinesisEvent = { Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] @@ -159,7 +159,7 @@ test.serial('it should throw an error if message does not include a collection', }); }); -test.serial('it should throw an error if message collection has wrong data type', (t) => { +test('it should throw an error if message collection has wrong data type', (t) => { const invalidMessage = JSON.stringify({ collection: {} }); const kinesisEvent = { Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] @@ -172,7 +172,7 @@ test.serial('it should throw an error if message collection has wrong data type' }); }); -test.serial('it should not throw if message is valid', (t) => { +test('it should not throw if message is valid', (t) => { const validMessage = JSON.stringify({ collection: 'confection-collection' }); const kinesisEvent = { Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 48970ce1bea..0a02e74fedd 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -104,26 +104,12 @@ async function queueWorkflowMessage(event) { const message = await getMessageFromTemplate(template); - let queueUrl = null; - - if (message.resources) { - queueUrl = message.resources.queues.startSF; - } - else { - queueUrl = message.meta.queues.startSF; - } + message.meta.provider = provider; + message.meta.collection = collection; - message.provider = provider; message.payload = payload; message.cumulus_meta.execution_name = uuidv4(); - if (collection) { - message.collection = { - id: collection.name, - meta: collection - }; - } - - return sendSQSMessage(queueUrl, message); + return sendSQSMessage(message.meta.queues.startSF, message); } exports.queueWorkflowMessage = queueWorkflowMessage; diff --git a/packages/ingest/test/.eslintrc.json b/packages/ingest/test/.eslintrc.json index d135721b8ff..6c199fd00d5 100644 --- a/packages/ingest/test/.eslintrc.json +++ b/packages/ingest/test/.eslintrc.json @@ -1,5 +1,6 @@ { "rules": { - "require-jsdoc": "off" + "require-jsdoc": "off", + "no-param-reassign": "off" } } diff --git a/packages/ingest/test/queue.js b/packages/ingest/test/queue.js new file mode 100644 index 00000000000..3e5c089b15f --- /dev/null +++ b/packages/ingest/test/queue.js @@ -0,0 +1,63 @@ +'use strict'; + +const test = require('ava'); +const queue = require('../queue'); +const { sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); +const { createQueue, randomString } = require('@cumulus/common/test-utils'); + +test.beforeEach(async (t) => { + t.context.templateBucket = randomString(); + await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); + + t.context.queueUrl = await createQueue(); + + t.context.stateMachineArn = randomString(); + + t.context.messageTemplate = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { queues: { startSF: t.context.queueUrl } } + }; + + const messageTemplateKey = `${randomString()}/template.json`; + await s3().putObject({ + Bucket: t.context.templateBucket, + Key: messageTemplateKey, + Body: JSON.stringify(t.context.messageTemplate) + }).promise(); + + t.context.template = `s3://${t.context.templateBucket}/${messageTemplateKey}`; +}); + +test.afterEach(async (t) => { + await Promise.all([ + recursivelyDeleteS3Bucket(t.context.templateBucket), + sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() + ]); +}); + +test('the queue receives a correctly formatted workflow message', async (t) => { + const event = { + template: t.context.template, + provider: 'PROV1', + collection: { name: 'test-collection' }, + payload: { test: 'test payload' } + }; + + await queue.queueWorkflowMessage(event); + await sqs().receiveMessage({ + QueueUrl: t.context.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise() + .then((receiveMessageResponse) => { + t.is(receiveMessageResponse.Messages.length, 1); + + const message = JSON.parse(receiveMessageResponse.Messages[0].Body); + t.is(message.meta.provider, 'PROV1'); + t.is(JSON.stringify(message.meta.collection), JSON.stringify({ name: 'test-collection' })); + t.is(JSON.stringify(message.payload), JSON.stringify({ test: 'test payload' })); + t.is(message.cumulus_meta.state_machine, t.context.stateMachineArn); + }); +}); From 67652c176a8544f46cc00a8113183c7a31a695d6 Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Thu, 8 Mar 2018 07:33:46 -0500 Subject: [PATCH 8/9] CUMULUS-359 Missed fix --- packages/ingest/queue.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 0a02e74fedd..702f2789d48 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -43,9 +43,7 @@ async function enqueueParsePdrMessage( message.meta.provider = provider; message.meta.collection = collection; - message.payload = { - pdr - }; + message.payload = { pdr }; return sendSQSMessage(queueUrl, message); } From 9411a385ce5f260e1ccf3ad35e6e23e2c5502bf4 Mon Sep 17 00:00:00 2001 From: Lauren Frederick Date: Thu, 8 Mar 2018 09:57:29 -0500 Subject: [PATCH 9/9] CUMULUS-359 More PR fixes --- packages/api/lambdas/kinesis-consumer.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index 76edf7cac99..c00da611b4f 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -52,7 +52,7 @@ async function queueMessageForRule(kinesisRule, eventObject) { }; return Rule.buildPayload(item) - .then((payload) => queueWorkflowMessage(payload)); + .then(queueWorkflowMessage); } /** @@ -71,13 +71,13 @@ async function validateMessage(event) { /** * Process data sent to a kinesis stream. Validate the data and - * create rules. + * queue a workflow message for each rule. * * @param {*} record - input to the kinesis stream * @returns {[Promises]} Array of promises. Each promise is resolved when a * message is queued for all associated kinesis rules. */ -async function processRecord(record) { +function processRecord(record) { const dataBlob = record.kinesis.data; const dataString = Buffer.from(dataBlob, 'base64').toString(); const eventObject = JSON.parse(dataString); @@ -91,8 +91,8 @@ async function processRecord(record) { /** * `handler` Looks up enabled 'kinesis'-type rules associated with the collection - * in the event argument. It creates new onetime rules for each rule found to trigger - * the workflow defined in the 'kinesis'-type rule. + * in the event argument. It enqueues a message for each kinesis-type rule to trigger + * the associated workflow. * * @param {*} event - lambda event * @param {*} context - lambda context