From 566a429954ebd4ee08a818487b704432ffe92f0f Mon Sep 17 00:00:00 2001 From: Aimee Barciauskas Date: Mon, 19 Mar 2018 11:57:54 -0400 Subject: [PATCH] CUMULUS-359 kinesis consumer users sf-scheduler (#262) **Summary:** Kinesis consumer users sf-scheduler Closes #262 Addresses [CUMULUS-359](https://bugs.earthdata.nasa.gov/browse/CUMULUS-359) ## Detailed Changes * kinesis consumer now calls sf-scheduler function schedule * updated tests * kinesis consumer lambda now requires env variables for the collections and providers table used in sf-scheduler#schedule * (unrelated) fixes eslint errors in `packages/api/lambdas` ## Test Plan - [x] Unit tests - [x] Adhoc testing Reviewers: @laurenfrederick --- .eslint-ratchet-high-water-mark | 2 +- CHANGELOG.md | 9 ++ packages/api/config/lambdas.yml | 7 +- packages/api/lambdas/bootstrap.js | 17 ++- packages/api/lambdas/db-indexer.js | 13 +- packages/api/lambdas/jobs.js | 23 ++-- packages/api/lambdas/kinesis-consumer.js | 25 ++-- packages/api/lambdas/sf-scheduler.js | 13 +- packages/api/tests/test-kinesis-consumer.js | 130 +++++++++++--------- packages/ingest/README.md | 10 ++ packages/ingest/aws.js | 4 +- packages/ingest/queue.js | 29 +---- packages/ingest/test/queue.js | 47 ++++--- 13 files changed, 176 insertions(+), 153 deletions(-) diff --git a/.eslint-ratchet-high-water-mark b/.eslint-ratchet-high-water-mark index fc2748e2649..98102958a72 100644 --- a/.eslint-ratchet-high-water-mark +++ b/.eslint-ratchet-high-water-mark @@ -1 +1 @@ -1910 +1874 diff --git a/CHANGELOG.md b/CHANGELOG.md index d8c3ddaceff..fbe7f4970b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,16 +5,25 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] + ### Fixed - Update vulnerable npm packages [CUMULUS-425] +- `@cumulus/api`: `kinesis-consumer.js` uses `sf-scheduler.js#schedule` instead of placing a message directly on the `startSF` SQS queue. This is a fix for [CUMULUS-359](https://bugs.earthdata.nasa.gov/browse/CUMULUS-359) because `sf-scheduler.js#schedule` looks up the provider and collection data in DynamoDB and adds it to the `meta` object of the enqueued message payload. +- `@cumulus/api`: `kinesis-consumer.js` catches and logs errors instead of doing an error callback. Before this change, `kinesis-consumer` was failing to process new records when an existing record caused an error because it would call back with an error and stop processing additional records. It keeps trying to process the record causing the error because it's "position" in the stream is unchanged. Catching and logging the errors is part 1 of the fix. Proposed part 2 is to enqueue the error and the message on a "dead-letter" queue so it can be processed later ([CUMULUS-413](https://bugs.earthdata.nasa.gov/browse/CUMULUS-413)). + +### Removed + +- `@cumulus/ingest/aws`: Remove queueWorkflowMessage which is no longer being used by `@cumulus/api`'s `kinesis-consumer.js`. ## [v1.1.4] - 2018-03-15 ### Added - added flag `useList` to parse-pdr [CUMULUS-404] ### Fixed + - Pass encrypted password to the ApiGranule Lambda function [CUMULUS-424] + ## [v1.1.3] - 2018-03-14 ### Fixed - Changed @cumulus/deployment package install behavior. The build process will happen after installation diff --git a/packages/api/config/lambdas.yml b/packages/api/config/lambdas.yml index 24b8afaddfa..c5b8699e34c 100644 --- a/packages/api/config/lambdas.yml +++ b/packages/api/config/lambdas.yml @@ -61,9 +61,12 @@ kinesisConsumer: RulesTable: function: Ref value: RulesTableDynamoDB - invoke: + CollectionsTable: function: Ref - value: ScheduleSFLambdaFunction + value: CollectionsTableDynamoDB + ProvidersTable: + function: Ref + value: ProvidersTableDynamoDB bucket: '{{buckets.internal}}' ScheduleSF: diff --git a/packages/api/lambdas/bootstrap.js b/packages/api/lambdas/bootstrap.js index d505e335b2a..5ca47596337 100644 --- a/packages/api/lambdas/bootstrap.js +++ b/packages/api/lambdas/bootstrap.js @@ -10,6 +10,7 @@ * - creating API users * - encrypting CMR user/pass and adding it to configuration files */ + 'use strict'; const https = require('https'); @@ -44,19 +45,17 @@ async function bootstrapElasticSearch(host, index = 'cumulus') { else { log.info(`index ${index} already exists`); } - - return; } async function bootstrapUsers(table, records) { if (!table) { - return new Promise(resolve => resolve()); + return new Promise((resolve) => resolve()); } const user = new Manager(table); // delete all user records const existingUsers = await user.scan(); - await Promise.all(existingUsers.Items.map(u => user.delete({ userName: u.userName }))); + await Promise.all(existingUsers.Items.map((u) => user.delete({ userName: u.userName }))); // add new ones const additions = records.map((record) => user.create({ userName: record.username, @@ -69,7 +68,7 @@ async function bootstrapUsers(table, records) { async function bootstrapCmrProvider(password) { if (!password) { - return new Promise(resolve => resolve('nopassword')); + return new Promise((resolve) => resolve('nopassword')); } return DefaultProvider.encrypt(password); } @@ -140,7 +139,7 @@ function handler(event, context, cb) { }; return sendResponse(event, 'SUCCESS', data, cb); - }).catch(e => { + }).catch((e) => { log.error(e); return sendResponse(event, 'FAILED', null, cb); }); @@ -155,8 +154,8 @@ justLocalRun(() => { //const a = {}; //handler(a, {}, (e, r) => console.log(e, r)); //bootstrapCmrProvider('testing').then(r => { - //console.log(r) - //return DefaultProvider.decrypt(r) + //console.log(r) + //return DefaultProvider.decrypt(r) //}).then(r => console.log(r)) - //.catch(e => console.log(e)); + //.catch(e => console.log(e)); }); diff --git a/packages/api/lambdas/db-indexer.js b/packages/api/lambdas/db-indexer.js index 1ea58dddff3..a2a8a936ab0 100644 --- a/packages/api/lambdas/db-indexer.js +++ b/packages/api/lambdas/db-indexer.js @@ -17,7 +17,7 @@ function indexRecord(esClient, record) { //determine whether the record should be indexed const acceptedTables = ['Collection', 'Provider', 'Rule']; - const tableConfig = {} + const tableConfig = {}; acceptedTables.forEach((a) => { tableConfig[`${stack}-${a}sTable`] = indexer[`index${a}`]; }); @@ -54,21 +54,20 @@ function indexRecord(esClient, record) { } async function indexRecords(records) { - const concurrencyLimit = process.env.CONCURRENCY || 3 + const concurrencyLimit = process.env.CONCURRENCY || 3; const limit = pLimit(concurrencyLimit); const esClient = await Search.es(); - const promises = records.map((record) => limit( - () => indexRecord(esClient, record) - )); + const promises = records.map((record) => limit(() => indexRecord(esClient, record))); return Promise.all(promises); } /** * Sync changes to dynamodb to an elasticsearch instance. * Sending updates to this lambda is handled by automatically AWS. - * @param {array} Records list of records with an eventName property signifying REMOVE or INSERT. - * @return {string} response text indicating the number of records altered in elasticsearch. + * + * @param {Array} Records - list of records with an eventName property signifying REMOVE or INSERT. + * @returns {string} response text indicating the number of records altered in elasticsearch. */ function handler(event, context, cb) { const records = event.Records; diff --git a/packages/api/lambdas/jobs.js b/packages/api/lambdas/jobs.js index 1cd80f4d82c..7c38385772a 100644 --- a/packages/api/lambdas/jobs.js +++ b/packages/api/lambdas/jobs.js @@ -1,4 +1,5 @@ /* runs a bunch of periodic jobs to keep the database updateToDate */ + 'use strict'; const get = require('lodash.get'); @@ -19,8 +20,8 @@ async function findStaleRecords(type, q, limit = 100, page = 1) { const response = await search.query(); //if (response.results.length >= limit) { - //const more = await findStaleRecords(type, q, limit, page + 1); - //return response.results.concat(more); + //const more = await findStaleRecords(type, q, limit, page + 1); + //return response.results.concat(more); //} return response.results; } @@ -29,16 +30,12 @@ async function updateGranulesAndPdrs(esClient, url, error) { // find related granule and update their status let searchTerm = `execution:"${url}"`; const granules = await findStaleRecords('granule', searchTerm, 100); - await Promise.all(granules.map(g => partialRecordUpdate( - esClient, g.granuleId, 'granule', { status: 'failed', error }, g.collectionId - ))); + await Promise.all(granules.map((g) => partialRecordUpdate(esClient, g.granuleId, 'granule', { status: 'failed', error }, g.collectionId))); // find related pdrs and update their status searchTerm = `execution:"${url}"`; const pdrs = await findStaleRecords('pdr', searchTerm, 100); - await Promise.all(pdrs.map(p => partialRecordUpdate( - esClient, p.pdrName, 'pdr', { status: 'failed', error } - ))); + await Promise.all(pdrs.map((p) => partialRecordUpdate(esClient, p.pdrName, 'pdr', { status: 'failed', error }))); } async function checkExecution(arn, url, timestamp, esClient) { @@ -133,17 +130,11 @@ async function cleanup() { const limit = pLimit(2); - await Promise.all( - executions.slice(0, 400).map( - ex => limit( - () => checkExecution(ex.arn, ex.execution, ex.timestamp, esClient) - ) - ) - ); + await Promise.all(executions.slice(0, 400).map((ex) => limit(() => checkExecution(ex.arn, ex.execution, ex.timestamp, esClient)))); } function handler(event, context, cb) { - cleanup().then(() => cb()).catch(e => { + cleanup().then(() => cb()).catch((e) => { log.error(e); cb(e); }); diff --git a/packages/api/lambdas/kinesis-consumer.js b/packages/api/lambdas/kinesis-consumer.js index c00da611b4f..acb7a4ba238 100644 --- a/packages/api/lambdas/kinesis-consumer.js +++ b/packages/api/lambdas/kinesis-consumer.js @@ -1,10 +1,13 @@ /* eslint-disable require-yield */ + 'use strict'; + const Ajv = require('ajv'); +const log = require('@cumulus/common/log'); const Rule = require('../models/rules'); const messageSchema = require('./kinesis-consumer-event-schema.json'); -const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); +const sfSchedule = require('./sf-scheduler'); /** * `getKinesisRules` scans and returns DynamoDB rules table for enabled, @@ -14,7 +17,7 @@ const { queueWorkflowMessage } = require('@cumulus/ingest/queue'); * @returns {Array} List of zero or more rules found from table scan */ async function getKinesisRules(event) { - const collection = event.collection; + const { collection } = event; const model = new Rule(); const kinesisRules = await model.scan({ names: { @@ -51,8 +54,11 @@ async function queueMessageForRule(kinesisRule, eventObject) { payload: eventObject }; - return Rule.buildPayload(item) - .then(queueWorkflowMessage); + const payload = await Rule.buildPayload(item); + return new Promise((resolve, reject) => sfSchedule(payload, {}, (err, result) => { + if (err) reject(err); + resolve(result); + })); } /** @@ -63,10 +69,10 @@ async function queueMessageForRule(kinesisRule, eventObject) { * @returns {(error|Object)} Throws an Ajv.ValidationError if event object is invalid. * Returns the event object if event is valid. */ -async function validateMessage(event) { +function validateMessage(event) { const ajv = new Ajv({ allErrors: true }); const validate = ajv.compile(messageSchema); - return await validate(event); + return validate(event); } /** @@ -86,7 +92,12 @@ function processRecord(record) { .then(getKinesisRules) .then((kinesisRules) => ( Promise.all(kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject))) - )); + )) + .catch((err) => { + log.error('Caught error in process record:'); + log.error(err); + return err; + }); } /** diff --git a/packages/api/lambdas/sf-scheduler.js b/packages/api/lambdas/sf-scheduler.js index 9b6b9257e41..a42615cf242 100644 --- a/packages/api/lambdas/sf-scheduler.js +++ b/packages/api/lambdas/sf-scheduler.js @@ -10,9 +10,10 @@ const { Provider, Collection } = require('../models'); * Builds a cumulus-compatible message and adds it to the startSF queue * startSF queue will then start a stepfunction for the given message * - * @param {object} event lambda input message - * @param {object} context lambda context - * @param {function} cb lambda callback + * @param {Object} event - lambda input message + * @param {Object} context - lambda context + * @param {function} cb - lambda callback + * @returns {function} Calls callback with result of SQS.sendMessage or error */ function schedule(event, context, cb) { const template = get(event, 'template'); @@ -53,9 +54,11 @@ function schedule(event, context, cb) { .then((c) => { if (c) message.meta.collection = c; }) - .then(() => SQS.sendMessage(message.meta.queues.startSF, message)) + .then(() => { + SQS.sendMessage(message.meta.queues.startSF, message); + }) .then((r) => cb(null, r)) - .catch(e => cb(e)); + .catch((e) => cb(e)); } module.exports = schedule; diff --git a/packages/api/tests/test-kinesis-consumer.js b/packages/api/tests/test-kinesis-consumer.js index 4b8579b78b6..feac904b977 100644 --- a/packages/api/tests/test-kinesis-consumer.js +++ b/packages/api/tests/test-kinesis-consumer.js @@ -1,14 +1,18 @@ 'use strict'; -const test = require('ava'); -const sinon = require('sinon'); const get = require('lodash.get'); -const { createQueue, sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); -const { randomString } = require('@cumulus/common/test-utils'); +const sinon = require('sinon'); +const test = require('ava'); +const { randomString } = require('@cumulus/common/test-utils'); +const { SQS } = require('@cumulus/ingest/aws'); +const { s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); const { getKinesisRules, handler } = require('../lambdas/kinesis-consumer'); + const manager = require('../models/base'); +const Collection = require('../models/collections'); const Rule = require('../models/rules'); +const Provider = require('../models/providers'); const testCollectionName = 'test-collection'; const ruleTableParams = { @@ -23,17 +27,20 @@ const eventData = JSON.stringify({ const event = { Records: [ - { kinesis: { data: new Buffer(eventData).toString('base64') } }, - { kinesis: { data: new Buffer(eventData).toString('base64') } } + { kinesis: { data: Buffer.from(eventData).toString('base64') } }, + { kinesis: { data: Buffer.from(eventData).toString('base64') } } ] }; +const collection = { + name: testCollectionName, + version: '0.0.0' +}; +const provider = { id: 'PROV1' }; + const commonRuleParams = { - collection: { - name: testCollectionName, - version: '0.0.0' - }, - provider: 'PROV1', + collection, + provider: provider.id, rule: { type: 'kinesis', value: 'test-kinesisarn' @@ -69,36 +76,39 @@ function testCallback(err, object) { return object; } +let sfSchedulerSpy; +const stubQueueUrl = 'stubQueueUrl'; + test.beforeEach(async (t) => { + sfSchedulerSpy = sinon.stub(SQS, 'sendMessage').returns(true); t.context.templateBucket = randomString(); - await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); - t.context.stateMachineArn = randomString(); + const messageTemplateKey = `${randomString()}/template.json`; - t.context.queueUrl = await createQueue(); - + t.context.messageTemplateKey = messageTemplateKey; t.context.messageTemplate = { cumulus_meta: { state_machine: t.context.stateMachineArn }, - meta: { queues: { startSF: t.context.queueUrl } } + meta: { queues: { startSF: stubQueueUrl } } }; - const messageTemplateKey = `${randomString()}/template.json`; + + await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); await s3().putObject({ Bucket: t.context.templateBucket, Key: messageTemplateKey, Body: JSON.stringify(t.context.messageTemplate) }).promise(); - sinon.stub(Rule, 'buildPayload').callsFake((item) => - Promise.resolve({ - template: `s3://${t.context.templateBucket}/${messageTemplateKey}`, - provider: item.provider, - collection: item.collection, - meta: get(item, 'meta', {}), - payload: get(item, 'payload', {}) - }) - ); + sinon.stub(Rule, 'buildPayload').callsFake((item) => Promise.resolve({ + template: `s3://${t.context.templateBucket}/${messageTemplateKey}`, + provider: item.provider, + collection: item.collection, + meta: get(item, 'meta', {}), + payload: get(item, 'payload', {}) + })); + sinon.stub(Provider.prototype, 'get').returns(provider); + sinon.stub(Collection.prototype, 'get').returns(collection); t.context.tableName = randomString(); process.env.RulesTable = t.context.tableName; @@ -115,16 +125,18 @@ test.beforeEach(async (t) => { test.afterEach(async (t) => { await Promise.all([ recursivelyDeleteS3Bucket(t.context.templateBucket), - sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise(), manager.deleteTable(t.context.tableName) ]); + sfSchedulerSpy.restore(); Rule.buildPayload.restore(); + Provider.prototype.get.restore(); + Collection.prototype.get.restore(); }); // getKinesisRule tests // eslint-disable-next-line max-len -test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', (t) => { - return getKinesisRules(JSON.parse(eventData)) +test('it should look up kinesis-type rules which are associated with the collection, but not those that are disabled', async (t) => { + await getKinesisRules(JSON.parse(eventData)) .then((result) => { t.is(result.length, 2); }); @@ -133,49 +145,53 @@ test('it should look up kinesis-type rules which are associated with the collect // handler tests test('it should enqueue a message for each associated workflow', async (t) => { await handler(event, {}, testCallback); - await sqs().receiveMessage({ - QueueUrl: t.context.queueUrl, - MaxNumberOfMessages: 10, - WaitTimeSeconds: 1 - }).promise() - .then((receiveMessageResponse) => { - t.is(receiveMessageResponse.Messages.length, 4); - receiveMessageResponse.Messages.map((message) => ( - t.is(JSON.stringify(JSON.parse(message.Body).payload), JSON.stringify({ collection: 'test-collection' })) - )); - }); + const actualQueueUrl = sfSchedulerSpy.getCall(0).args[0]; + t.is(actualQueueUrl, stubQueueUrl); + const actualMessage = sfSchedulerSpy.getCall(0).args[1]; + const expectedMessage = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { + queues: { startSF: stubQueueUrl }, + provider, + collection + }, + payload: { + collection: 'test-collection' + } + }; + t.is(actualMessage.cumulus_meta.state_machine, expectedMessage.cumulus_meta.state_machine); + t.deepEqual(actualMessage.meta, expectedMessage.meta); + t.deepEqual(actualMessage.payload, expectedMessage.payload); }); -test('it should throw an error if message does not include a collection', (t) => { +test('it should throw an error if message does not include a collection', async (t) => { const invalidMessage = JSON.stringify({}); const kinesisEvent = { - Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] + Records: [{ kinesis: { data: Buffer.from(invalidMessage).toString('base64') } }] }; - return handler(kinesisEvent, {}, testCallback) - .catch((err) => { - const errObject = JSON.parse(err); - t.is(errObject.errors[0].dataPath, ''); - t.is(errObject.errors[0].message, 'should have required property \'collection\''); - }); + const errors = await handler(kinesisEvent, {}, testCallback); + t.is(errors[0].message, 'validation failed'); + t.is(errors[0].errors[0].dataPath, ''); + t.is(errors[0].errors[0].message, 'should have required property \'collection\''); }); -test('it should throw an error if message collection has wrong data type', (t) => { +test('it should throw an error if message collection has wrong data type', async (t) => { const invalidMessage = JSON.stringify({ collection: {} }); const kinesisEvent = { - Records: [{ kinesis: { data: new Buffer(invalidMessage).toString('base64') } }] + Records: [{ kinesis: { data: Buffer.from(invalidMessage).toString('base64') } }] }; - return handler(kinesisEvent, {}, testCallback) - .catch((err) => { - const errObject = JSON.parse(err); - t.is(errObject.errors[0].dataPath, '.collection'); - t.is(errObject.errors[0].message, 'should be string'); - }); + const errors = await handler(kinesisEvent, {}, testCallback); + t.is(errors[0].message, 'validation failed'); + t.is(errors[0].errors[0].dataPath, '.collection'); + t.is(errors[0].errors[0].message, 'should be string'); }); test('it should not throw if message is valid', (t) => { const validMessage = JSON.stringify({ collection: 'confection-collection' }); const kinesisEvent = { - Records: [{ kinesis: { data: new Buffer(validMessage).toString('base64') } }] + Records: [{ kinesis: { data: Buffer.from(validMessage).toString('base64') } }] }; return handler(kinesisEvent, {}, testCallback).then((r) => t.deepEqual(r, [[]])); }); diff --git a/packages/ingest/README.md b/packages/ingest/README.md index 85e685b0f8a..a1ba7a25751 100644 --- a/packages/ingest/README.md +++ b/packages/ingest/README.md @@ -16,6 +16,16 @@ Cumulus is a cloud-based data ingest, archive, distribution and management proto npm install @cumulus/ingest ``` +## Testing + +Running tests locally requires [localstack](https://github.com/localstack/localstack). + +With localstack running, you can run tests using: + +``` +LOCALSTACK_HOST=localhost npm test +``` + ## Modules All modules are accessible using require: `require('@cumulus/ingest/')` or import: `import from '@cumulus/ingest/'`. diff --git a/packages/ingest/aws.js b/packages/ingest/aws.js index e24c95297a9..a39f6c67e3e 100644 --- a/packages/ingest/aws.js +++ b/packages/ingest/aws.js @@ -214,14 +214,12 @@ class S3 { } static async get(bucket, key) { - const s3 = new AWS.S3(); - const params = { Bucket: bucket, Key: key }; - return s3.getObject(params).promise(); + return aws.s3().getObject(params).promise(); } static async upload(bucket, key, body, acl = 'private') { diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 702f2789d48..7e5790a1823 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -5,8 +5,6 @@ const { sendSQSMessage, parseS3Uri } = require('@cumulus/common/aws'); -const get = require('lodash.get'); -const uuidv4 = require('uuid/v4'); /** * Create a message from a template stored on S3 @@ -37,7 +35,8 @@ async function enqueueParsePdrMessage( queueUrl, parsePdrMessageTemplateUri, provider, - collection) { + collection +) { const message = await getMessageFromTemplate(parsePdrMessageTemplateUri); message.meta.provider = provider; @@ -87,27 +86,3 @@ async function enqueueGranuleIngestMessage( return sendSQSMessage(queueUrl, message); } exports.enqueueGranuleIngestMessage = enqueueGranuleIngestMessage; - -/** - * Queue a workflow to be picked up by SF starter - * - * @param {Object} event - event to queue with workflow and payload info - * @returns {Promise} - resolves when the message has been enqueued - */ -async function queueWorkflowMessage(event) { - const template = get(event, 'template'); - const provider = get(event, 'provider', {}); - const collection = get(event, 'collection', {}); - const payload = get(event, 'payload', {}); - - const message = await getMessageFromTemplate(template); - - message.meta.provider = provider; - message.meta.collection = collection; - - message.payload = payload; - message.cumulus_meta.execution_name = uuidv4(); - - return sendSQSMessage(message.meta.queues.startSF, message); -} -exports.queueWorkflowMessage = queueWorkflowMessage; diff --git a/packages/ingest/test/queue.js b/packages/ingest/test/queue.js index 4ad64c9ea89..0d1db238e6c 100644 --- a/packages/ingest/test/queue.js +++ b/packages/ingest/test/queue.js @@ -5,7 +5,7 @@ const queue = require('../queue'); const { createQueue, sqs, s3, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); const { randomString } = require('@cumulus/common/test-utils'); -test.beforeEach(async (t) => { +test.beforeEach(async(t) => { t.context.templateBucket = randomString(); await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); @@ -21,6 +21,7 @@ test.beforeEach(async (t) => { }; const messageTemplateKey = `${randomString()}/template.json`; + t.context.messageTemplateKey = messageTemplateKey; await s3().putObject({ Bucket: t.context.templateBucket, Key: messageTemplateKey, @@ -30,34 +31,42 @@ test.beforeEach(async (t) => { t.context.template = `s3://${t.context.templateBucket}/${messageTemplateKey}`; }); -test.afterEach(async (t) => { +test.afterEach(async(t) => { await Promise.all([ recursivelyDeleteS3Bucket(t.context.templateBucket), sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() ]); }); -test('the queue receives a correctly formatted workflow message', async (t) => { - const event = { - template: t.context.template, - provider: 'PROV1', - collection: { name: 'test-collection' }, - payload: { test: 'test payload' } - }; +test('the queue receives a correctly formatted workflow message', async(t) => { + const granule = { granuleId: '1', files: [] }; + const { queueUrl } = t.context; + const templateUri = `s3://${t.context.templateBucket}/${t.context.messageTemplateKey}`; + const collection = { name: 'test-collection', version: '0.0.0' }; + const provider = { id: 'test-provider' }; - await queue.queueWorkflowMessage(event); + await queue.enqueueGranuleIngestMessage(granule, queueUrl, templateUri, provider, collection); await sqs().receiveMessage({ QueueUrl: t.context.queueUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 1 }).promise() - .then((receiveMessageResponse) => { - t.is(receiveMessageResponse.Messages.length, 1); - - const message = JSON.parse(receiveMessageResponse.Messages[0].Body); - t.is(message.meta.provider, 'PROV1'); - t.is(JSON.stringify(message.meta.collection), JSON.stringify({ name: 'test-collection' })); - t.is(JSON.stringify(message.payload), JSON.stringify({ test: 'test payload' })); - t.is(message.cumulus_meta.state_machine, t.context.stateMachineArn); - }); + .then((receiveMessageResponse) => { + t.is(receiveMessageResponse.Messages.length, 1); + + const actualMessage = JSON.parse(receiveMessageResponse.Messages[0].Body); + const expectedMessage = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { + queues: { startSF: t.context.queueUrl }, + provider: provider, + collection: collection + }, + payload: { granules: [granule] } + }; + + t.deepEqual(expectedMessage, actualMessage); + }); });