diff --git a/CHANGELOG.md b/CHANGELOG.md index 135046f382a..7fb0610e5bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Removed +- Unused queue lambda in api/lambdas [CUMULUS-359] + +### Fixed +- Kinesis message content is passed to the triggered workflow [CUMULUS-359] +- Kinesis message queues a workflow message and does not write to rules table [CUMULUS-359] + ## [v1.1.0] - 2018-03-05 ### Added diff --git a/packages/api/index.js b/packages/api/index.js index 3a5ab6fcbed..5ce8e4579e8 100644 --- a/packages/api/index.js +++ b/packages/api/index.js @@ -19,7 +19,6 @@ exports.jobs = require('./lambdas/jobs'); exports.bootstrap = require('./lambdas/bootstrap').handler; exports.scheduler = require('./lambdas/sf-scheduler'); exports.starter = require('./lambdas/sf-starter'); -exports.queue = require('./lambdas/queue'); exports.kinesisConsumer = require('./lambdas/kinesis-consumer').handler; const indexer = require('./es/indexer'); diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index 15f203c25b5..c00da611b4f 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -3,17 +3,19 @@ const Ajv = require('ajv'); const Rule = require('../models/rules'); -const model = new Rule(); const messageSchema = require('./kinesis-consumer-event-schema.json'); +const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); /** - * `getKinesisRules` scans and returns DynamoDB rules table for enabled, 'kinesis'-type rules associated with the * collection declared in the event + * `getKinesisRules` scans and returns DynamoDB rules table for enabled, + * 'kinesis'-type rules associated with the * collection declared in the event * - * @param {object} event lambda event - * @returns {array} List of zero or more rules found from table scan + * @param {Object} event - lambda event + * @returns {Array} List of zero or more rules found from table scan */ async function getKinesisRules(event) { const collection = event.collection; + const model = new Rule(); const kinesisRules = await model.scan({ names: { '#col': 'collection', @@ -34,63 +36,74 @@ async function getKinesisRules(event) { } /** - * `createOneTimeRules` creates new rules with the same data as a kinesis-type rule, except the type is modified to 'onetime'. + * Queue a workflow message for the kinesis rule with the message passed + * to kinesis as the payload * - * @param {array} kinesisRules list of rule objects - * @returns {array} Array of promises for model.create + * @param {Object} kinesisRule - kinesis rule to queue the message for + * @param {Object} eventObject - message passed to kinesis + * @returns {Promise} promise resolved when the message is queued */ -async function createOneTimeRules(kinesisRules) { - const oneTimeRulePromises = kinesisRules.map((kinesisRule) => { - const oneTimeRuleParams = Object.assign({}, kinesisRule); - delete oneTimeRuleParams['createdAt']; - delete oneTimeRuleParams['updatedAt']; - oneTimeRuleParams.name = `${kinesisRule.name}_${Date.now().toString()}`; - oneTimeRuleParams.rule.type = 'onetime'; - return model.create(oneTimeRuleParams); - }); +async function queueMessageForRule(kinesisRule, eventObject) { + const item = { + workflow: kinesisRule.workflow, + provider: kinesisRule.provider, + collection: kinesisRule.collection, + payload: eventObject + }; - return await Promise.all(oneTimeRulePromises); + return Rule.buildPayload(item) + .then(queueWorkflowMessage); } /** - * `validateMessage` validates an event as being valid for creating a workflow. See the messageSchema defined at - * the top of this file. + * `validateMessage` validates an event as being valid for creating a workflow. + * See the messageSchema defined at the top of this file. * - * @param {object} event lambda event - * @returns {(error|object)} Throws an Ajv.ValidationError if event object is invalid. Returns the event object if event is valid. + * @param {Object} event - lambda event + * @returns {(error|Object)} Throws an Ajv.ValidationError if event object is invalid. + * Returns the event object if event is valid. */ async function validateMessage(event) { - const ajv = new Ajv({allErrors: true}); + const ajv = new Ajv({ allErrors: true }); const validate = ajv.compile(messageSchema); return await validate(event); } -async function processRecord(record) { +/** + * Process data sent to a kinesis stream. Validate the data and + * queue a workflow message for each rule. + * + * @param {*} record - input to the kinesis stream + * @returns {[Promises]} Array of promises. Each promise is resolved when a + * message is queued for all associated kinesis rules. + */ +function processRecord(record) { const dataBlob = record.kinesis.data; const dataString = Buffer.from(dataBlob, 'base64').toString(); const eventObject = JSON.parse(dataString); - await validateMessage(eventObject) + return validateMessage(eventObject) .then(getKinesisRules) - .then((kinesisRules) => { - return createOneTimeRules(kinesisRules); - }); + .then((kinesisRules) => ( + Promise.all(kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject))) + )); } /** - * `handler` Looks up enabled 'kinesis'-type rules associated with the collection in the event argument. It - * creates new onetime rules for each rule found to trigger the workflow defined in the 'kinesis'-type rule. + * `handler` Looks up enabled 'kinesis'-type rules associated with the collection + * in the event argument. It enqueues a message for each kinesis-type rule to trigger + * the associated workflow. * - * @param {*} event lambda event - * @param {*} context lambda context - * @param {*} cb callback function to explicitly return information back to the caller. + * @param {*} event - lambda event + * @param {*} context - lambda context + * @param {*} cb - callback function to explicitly return information back to the caller. * @returns {(error|string)} Success message or error */ function handler(event, context, cb) { const records = event.Records; - return Promise.all(records.map(r => processRecord(r))) - .then((results) => cb(null, results.filter(r => r !== undefined))) + return Promise.all(records.map(processRecord)) + .then((results) => cb(null, results.filter((r) => r !== undefined))) .catch((err) => { cb(JSON.stringify(err)); }); diff --git a/packages/api/lambdas/queue.js b/packages/api/lambdas/queue.js deleted file mode 100644 index f8dc5c373f5..00000000000 --- a/packages/api/lambdas/queue.js +++ /dev/null @@ -1,34 +0,0 @@ -'use strict'; - -const get = require('lodash.get'); -const aws = require('@cumulus/ingest/aws'); -const uuidv4 = require('uuid/v4'); - -function handler(event, context, cb) { - const template = get(event, 'template'); - const provider = get(event, 'provider', {}); - const meta = get(event, 'meta', {}); - const collection = get(event, 'collection', {}); - const payload = get(event, 'payload', {}); - - const parsed = aws.S3.parseS3Uri(template); - aws.S3.get(parsed.Bucket, parsed.Key).then((data) => { - const message = JSON.parse(data.Body); - message.provider = provider; - message.meta = meta; - message.payload = payload; - message.cumulus_meta.execution_name = uuidv4(); - - if (collection) { - message.collection = { - id: collection.name, - meta: collection - }; - } - - return aws.SQS.sendMessage(message.resources.queues.startSF, message) - .then((r) => cb(null, r)); - }) - .catch(cb); -} -module.exports = handler; diff --git a/packages/api/tests/.eslintrc.json b/packages/api/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/packages/api/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index 254c5e7363b..9418bd50e97 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -2,16 +2,13 @@ const test = require('ava'); const sinon = require('sinon'); +const get = require('lodash.get'); +const { sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); +const { createQueue, randomString } = require('@cumulus/common/test-utils'); -const tableName = 'rule'; -process.env.RulesTable = tableName; -process.env.stackName = 'test-stack'; -process.env.bucket = 'test-bucket'; -process.env.kinesisConsumer = 'test-kinesisConsumer'; const { getKinesisRules, handler } = require('../lambdas/kinesis-consumer'); const manager = require('../models/base'); const Rule = require('../models/rules'); -const model = new Rule(); const testCollectionName = 'test-collection'; const ruleTableParams = { @@ -26,8 +23,8 @@ const eventData = JSON.stringify({ const event = { Records: [ - {kinesis: {data: new Buffer(eventData).toString('base64')}}, - {kinesis: {data: new Buffer(eventData).toString('base64')}} + { kinesis: { data: new Buffer(eventData).toString('base64') } }, + { kinesis: { data: new Buffer(eventData).toString('base64') } } ] }; @@ -36,6 +33,7 @@ const commonRuleParams = { name: testCollectionName, version: '0.0.0' }, + provider: 'PROV1', rule: { type: 'kinesis', value: 'test-kinesisarn' @@ -59,68 +57,101 @@ const disabledRuleParams = Object.assign({}, commonRuleParams, { state: 'DISABLED' }); +/** + * Callback used for testing + * + * @param {*} err - error + * @param {Object} object - object + * @returns {Object} object, if no error is thrown + */ function testCallback(err, object) { if (err) throw err; return object; -}; +} + +test.beforeEach(async (t) => { + t.context.templateBucket = randomString(); + await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); + + t.context.stateMachineArn = randomString(); -test.before(async () => { - sinon.stub(Rule, 'buildPayload').resolves(true); - await manager.createTable(tableName, ruleTableParams); + t.context.queueUrl = await createQueue(); + + t.context.messageTemplate = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { queues: { startSF: t.context.queueUrl } } + }; + const messageTemplateKey = `${randomString()}/template.json`; + await s3().putObject({ + Bucket: t.context.templateBucket, + Key: messageTemplateKey, + Body: JSON.stringify(t.context.messageTemplate) + }).promise(); + + sinon.stub(Rule, 'buildPayload').callsFake((item) => + Promise.resolve({ + template: `s3://${t.context.templateBucket}/${messageTemplateKey}`, + provider: item.provider, + collection: item.collection, + meta: get(item, 'meta', {}), + payload: get(item, 'payload', {}) + }) + ); + + t.context.tableName = randomString(); + process.env.RulesTable = t.context.tableName; + process.env.stackName = randomString(); + process.env.bucket = randomString(); + process.env.kinesisConsumer = randomString(); + + const model = new Rule(t.context.tableName); + await manager.createTable(t.context.tableName, ruleTableParams); + await Promise.all([rule1Params, rule2Params, disabledRuleParams] + .map((rule) => model.create(rule))); }); -test.after.always(async () => { - await manager.deleteTable(tableName); +test.afterEach(async (t) => { + await Promise.all([ + recursivelyDeleteS3Bucket(t.context.templateBucket), + sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise(), + manager.deleteTable(t.context.tableName) + ]); + Rule.buildPayload.restore(); }); // getKinesisRule tests -test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', t => { - return Promise.all([rule1Params, rule2Params, disabledRuleParams].map(x => model.create(x))) - .then(() => { - return getKinesisRules(JSON.parse(eventData)) - }).then((result) => { +// eslint-disable-next-line max-len +test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => { + return getKinesisRules(JSON.parse(eventData)) + .then((result) => { t.is(result.length, 2); }); }); // handler tests -test('it should create a onetime rule for each associated workflow', async t => { - await handler(event, {}, testCallback).then(() => { - return model.scan({ - names: { - '#col': 'collection', - '#nm': 'name', - '#st': 'state', - '#rl': 'rule', - '#tp': 'type' - }, - filter: '#st = :enabledState AND #col.#nm = :collectionName AND #rl.#tp = :ruleType', - values: { - ':enabledState': 'ENABLED', - ':collectionName': testCollectionName, - ':ruleType': 'onetime' - } - }); - }) - .then((results) => { - t.is(results.Items.length, 4); - const workflowNames = results.Items.map(i => i.workflow).sort(); - t.deepEqual(workflowNames, [ - 'test-workflow-1', - 'test-workflow-1', - 'test-workflow-2', - 'test-workflow-2' - ]); - results.Items.forEach(r => t.is(r.rule.type, 'onetime')); - }); +test('it should enqueue a message for each associated workflow', async (t) => { + await handler(event, {}, testCallback); + await sqs().receiveMessage({ + QueueUrl: t.context.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise() + .then((receiveMessageResponse) => { + t.is(receiveMessageResponse.Messages.length, 4); + receiveMessageResponse.Messages.map((message) => ( + t.is(JSON.stringify(JSON.parse(message.Body).payload), JSON.stringify({ collection: 'test-collection' })) + )); + }); }); -test('it should throw an error if message does not include a collection', t => { +test('it should throw an error if message does not include a collection', (t) => { const invalidMessage = JSON.stringify({}); - const event = { - Records: [{kinesis: {data: new Buffer(invalidMessage).toString('base64')}}] + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] }; - return handler(event, {}, testCallback) + return handler(kinesisEvent, {}, testCallback) .catch((err) => { const errObject = JSON.parse(err); t.is(errObject.errors[0].dataPath, ''); @@ -128,12 +159,12 @@ test('it should throw an error if message does not include a collection', t => { }); }); -test('it should throw an error if message collection has wrong data type', t => { - const invalidMessage = JSON.stringify({collection: {}}); - const event = { - Records: [{kinesis: {data: new Buffer(invalidMessage).toString('base64')}}] +test('it should throw an error if message collection has wrong data type', (t) => { + const invalidMessage = JSON.stringify({ collection: {} }); + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] }; - return handler(event, {}, testCallback) + return handler(kinesisEvent, {}, testCallback) .catch((err) => { const errObject = JSON.parse(err); t.is(errObject.errors[0].dataPath, '.collection'); @@ -141,10 +172,10 @@ test('it should throw an error if message collection has wrong data type', t => }); }); -test('it should not throw if message is valid', t => { - const validMessage = JSON.stringify({collection: 'confection-collection'}); - const event = { - Records: [{kinesis: {data: new Buffer(validMessage).toString('base64')}}] +test('it should not throw if message is valid', (t) => { + const validMessage = JSON.stringify({ collection: 'confection-collection' }); + const kinesisEvent = { + Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] }; - return handler(event, {}, testCallback).then(r => t.deepEqual(r, [])); + return handler(kinesisEvent, {}, testCallback).then((r) => t.deepEqual(r, [[]])); }); diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 7990c27120f..702f2789d48 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -5,13 +5,15 @@ const { sendSQSMessage, parseS3Uri } = require('@cumulus/common/aws'); +const get = require('lodash.get'); +const uuidv4 = require('uuid/v4'); /** - * Create a message from a template stored on S3 - * - * @param {string} templateUri - S3 uri to the workflow template - * @returns {Promise} message object - **/ + * Create a message from a template stored on S3 + * + * @param {string} templateUri - S3 uri to the workflow template + * @returns {Promise} message object + **/ async function getMessageFromTemplate(templateUri) { const parsedS3Uri = parseS3Uri(templateUri); const data = await getS3Object(parsedS3Uri.Bucket, parsedS3Uri.Key); @@ -85,3 +87,27 @@ async function enqueueGranuleIngestMessage( return sendSQSMessage(queueUrl, message); } exports.enqueueGranuleIngestMessage = enqueueGranuleIngestMessage; + +/** + * Queue a workflow to be picked up by SF starter + * + * @param {Object} event - event to queue with workflow and payload info + * @returns {Promise} - resolves when the message has been enqueued + */ +async function queueWorkflowMessage(event) { + const template = get(event, 'template'); + const provider = get(event, 'provider', {}); + const collection = get(event, 'collection', {}); + const payload = get(event, 'payload', {}); + + const message = await getMessageFromTemplate(template); + + message.meta.provider = provider; + message.meta.collection = collection; + + message.payload = payload; + message.cumulus_meta.execution_name = uuidv4(); + + return sendSQSMessage(message.meta.queues.startSF, message); +} +exports.queueWorkflowMessage = queueWorkflowMessage; diff --git a/packages/ingest/test/.eslintrc.json b/packages/ingest/test/.eslintrc.json index d135721b8ff..6c199fd00d5 100644 --- a/packages/ingest/test/.eslintrc.json +++ b/packages/ingest/test/.eslintrc.json @@ -1,5 +1,6 @@ { "rules": { - "require-jsdoc": "off" + "require-jsdoc": "off", + "no-param-reassign": "off" } } diff --git a/packages/ingest/test/queue.js b/packages/ingest/test/queue.js new file mode 100644 index 00000000000..3e5c089b15f --- /dev/null +++ b/packages/ingest/test/queue.js @@ -0,0 +1,63 @@ +'use strict'; + +const test = require('ava'); +const queue = require('../queue'); +const { sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); +const { createQueue, randomString } = require('@cumulus/common/test-utils'); + +test.beforeEach(async (t) => { + t.context.templateBucket = randomString(); + await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); + + t.context.queueUrl = await createQueue(); + + t.context.stateMachineArn = randomString(); + + t.context.messageTemplate = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { queues: { startSF: t.context.queueUrl } } + }; + + const messageTemplateKey = `${randomString()}/template.json`; + await s3().putObject({ + Bucket: t.context.templateBucket, + Key: messageTemplateKey, + Body: JSON.stringify(t.context.messageTemplate) + }).promise(); + + t.context.template = `s3://${t.context.templateBucket}/${messageTemplateKey}`; +}); + +test.afterEach(async (t) => { + await Promise.all([ + recursivelyDeleteS3Bucket(t.context.templateBucket), + sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() + ]); +}); + +test('the queue receives a correctly formatted workflow message', async (t) => { + const event = { + template: t.context.template, + provider: 'PROV1', + collection: { name: 'test-collection' }, + payload: { test: 'test payload' } + }; + + await queue.queueWorkflowMessage(event); + await sqs().receiveMessage({ + QueueUrl: t.context.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise() + .then((receiveMessageResponse) => { + t.is(receiveMessageResponse.Messages.length, 1); + + const message = JSON.parse(receiveMessageResponse.Messages[0].Body); + t.is(message.meta.provider, 'PROV1'); + t.is(JSON.stringify(message.meta.collection), JSON.stringify({ name: 'test-collection' })); + t.is(JSON.stringify(message.payload), JSON.stringify({ test: 'test payload' })); + t.is(message.cumulus_meta.state_machine, t.context.stateMachineArn); + }); +});