diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bb1f59ed4d..a3ac839934c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added - - Integration test package with command line tool [CUMULUS-200] by @laurenfrederick +- Added a `jlog` function to `common/test-utils` to aid in test debugging +- Integration test package with command line tool [CUMULUS-200] by @laurenfrederick + +### Updated +- The `queue-pdrs` task now uses the [cumulus-message-adapter-js](https://github.com/cumulus-nasa/cumulus-message-adapter-js) + library +- Updated the `queue-pdrs` JSON schemas +- The test-utils schema validation functions now throw an error if validation + fails +- The `queue-granules` task now uses the [cumulus-message-adapter-js](https://github.com/cumulus-nasa/cumulus-message-adapter-js) + library +- Updated the `queue-granules` JSON schemas + +### Removed +- Removed the `getSfnExecutionByName` function from `common/aws` +- Removed the `getGranuleStatus` function from `common/aws` ## [v1.0.1] - 2018-02-27 diff --git a/cumulus/tasks/queue-pdrs/index.js b/cumulus/tasks/queue-pdrs/index.js index d55ce75a95e..cd088159c06 100644 --- a/cumulus/tasks/queue-pdrs/index.js +++ b/cumulus/tasks/queue-pdrs/index.js @@ -1,42 +1,41 @@ 'use strict'; -import { queuePdr } from '@cumulus/ingest/queue'; -const log = require('@cumulus/common/log'); +const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); +const { enqueueParsePdrMessage } = require('@cumulus/ingest/queue'); /** -* Callback function provided by aws lambda. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-handler.html#nodejs-prog-model-handler-callback -* @callback lambdaCallback -* @param {object} error -* @param {object} output - output object matching schemas/output.json -* @param {integer} output.pdrs_queued -*/ - -/** -* For each PDR, generate a new SF messages send to the step function queue to be executed -* @param {object} event lambda event object -* @param {object} event.input -* @param {array} event.input.pdrs -* @param {object} context Lambda context object. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html -* @param {lambdaCallback} callback callback function -* @return {undefined} +* See schemas/input.json and schemas/config.json for detailed event description +* +* @param {Object} event - Lambda event object +* @returns {Promise} - see schemas/output.json for detailed output schema +* that is passed to the next task in the workflow **/ -function handler(event, context, cb) { +async function queuePdrs(event) { const pdrs = event.input.pdrs || []; - const config = event.config; - const queuedPdrs = pdrs.map((pdr) => queuePdr( - config.queueUrl, - config.templateUri, - config.provider, - config.collection, - pdr - )); - return Promise.all(queuedPdrs).then(() => { - cb(null, { pdrs_queued: queuedPdrs.length }); - }).catch(e => { - log.error(e); - return cb(e); - }); + await Promise.all( + pdrs.map((pdr) => enqueueParsePdrMessage( + pdr, + event.config.queueUrl, + event.config.parsePdrMessageTemplateUri, + event.config.provider, + event.config.collection + )) + ); + + return { pdrs_queued: pdrs.length }; } +exports.queuePdrs = queuePdrs; -module.exports.handler = handler; +/** + * Lambda handler + * + * @param {Object} event - a Cumulus Message + * @param {Object} context - an AWS Lambda context + * @param {Function} callback - an AWS Lambda handler + * @returns {undefined} - does not return a value + */ +function handler(event, context, callback) { + cumulusMessageAdapter.runCumulusTask(queuePdrs, event, context, callback); +} +exports.handler = handler; diff --git a/cumulus/tasks/queue-pdrs/package.json b/cumulus/tasks/queue-pdrs/package.json index 6a195de0a3e..e1b7feb8b77 100644 --- a/cumulus/tasks/queue-pdrs/package.json +++ b/cumulus/tasks/queue-pdrs/package.json @@ -38,6 +38,7 @@ "license": "Apache-2.0", "dependencies": { "@cumulus/common": "^1.0.1", + "@cumulus/cumulus-message-adapter-js": "0.0.1-beta.3", "@cumulus/ingest": "^1.0.1", "babel-core": "^6.25.0", "babel-loader": "^6.2.4", diff --git a/cumulus/tasks/queue-pdrs/schemas/config.json b/cumulus/tasks/queue-pdrs/schemas/config.json new file mode 100644 index 00000000000..84e11e636d8 --- /dev/null +++ b/cumulus/tasks/queue-pdrs/schemas/config.json @@ -0,0 +1,18 @@ +{ + "title": "QueuePdrsConfig", + "description": "Describes the config used by the queue-pdrs task", + "type": "object", + "required": [ + "collection", + "provider", + "queueUrl", + "parsePdrMessageTemplateUri" + ], + "additionalProperties": false, + "properties": { + "collection": { "type": "object" }, + "provider": { "type": "object" }, + "queueUrl": { "type": "string" }, + "parsePdrMessageTemplateUri": { "type": "string" } + } +} diff --git a/cumulus/tasks/queue-pdrs/schemas/config.json.txt b/cumulus/tasks/queue-pdrs/schemas/config.json.txt deleted file mode 100644 index 99a1737194f..00000000000 --- a/cumulus/tasks/queue-pdrs/schemas/config.json.txt +++ /dev/null @@ -1,41 +0,0 @@ -{ - "title": "QueuePdrsConfig", - "description": "Describes the config used by the queue-pdrs task", - "type": "object", - "required": [], - "properties": { - "buckets": { - "type": "object", - "description": "aws s3 buckets used by this task", - "properties": { - "internal": { "type": "string" }, - "private": { "type": "string" }, - "protected": { "type": "string" }, - "public": { "type": "string" } - } - }, - "queues": { - "type": "object", - "properties": { - "startSF": { "type": "string" } - } - }, - "templates": { - "type": "object" - }, - "cumulus_meta": { - "type": "object", - "properties": { - "config": { - "type": "object", - "properties": { - "next": { "type": "string" } - } - } - } - }, - "collection": { - "type": "object" - } - } -} diff --git a/cumulus/tasks/queue-pdrs/schemas/input.json.txt b/cumulus/tasks/queue-pdrs/schemas/input.json similarity index 100% rename from cumulus/tasks/queue-pdrs/schemas/input.json.txt rename to cumulus/tasks/queue-pdrs/schemas/input.json diff --git a/cumulus/tasks/queue-pdrs/schemas/output.json.txt b/cumulus/tasks/queue-pdrs/schemas/output.json similarity index 75% rename from cumulus/tasks/queue-pdrs/schemas/output.json.txt rename to cumulus/tasks/queue-pdrs/schemas/output.json index 45cdd1d3798..a74f4c5f114 100644 --- a/cumulus/tasks/queue-pdrs/schemas/output.json.txt +++ b/cumulus/tasks/queue-pdrs/schemas/output.json @@ -2,6 +2,8 @@ "title": "QueuePdrsOutput", "description": "Describes the output produced by the queue-pdrs task", "type": "object", + "required": [ "pdrs_queued" ], + "additionalProperties": false, "properties": { "pdrs_queued": { "type": "integer" diff --git a/cumulus/tasks/queue-pdrs/tests/.eslintrc.json b/cumulus/tasks/queue-pdrs/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/cumulus/tasks/queue-pdrs/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/cumulus/tasks/queue-pdrs/tests/index.js b/cumulus/tasks/queue-pdrs/tests/index.js index 3f659a699e3..6f94fa69b35 100644 --- a/cumulus/tasks/queue-pdrs/tests/index.js +++ b/cumulus/tasks/queue-pdrs/tests/index.js @@ -1,53 +1,170 @@ -/* eslint-disable no-param-reassign */ 'use strict'; const test = require('ava'); -const MockAWS = require('@mapbox/mock-aws-sdk-js'); const { s3, sqs, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); -const { createQueue, randomString } = require('@cumulus/common/test-utils'); +const { + createQueue, + randomString, + validateConfig, + validateInput, + validateOutput +} = require('@cumulus/common/test-utils'); -const { handler } = require('../index'); -const inputJSON = require('./fixtures/input.json'); -const workflowTemplate = require('./fixtures/workflow-template.json'); - -const aws = require('@cumulus/common/aws'); +const { queuePdrs } = require('../index'); test.beforeEach(async (t) => { - t.context.queueUrl = await createQueue(); + t.context.templateBucket = randomString(); + await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); + + t.context.stateMachineArn = randomString(); + + t.context.messageTemplate = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: {} + }; + const messageTemplateKey = `${randomString()}/template.json`; + await s3().putObject({ + Bucket: t.context.templateBucket, + Key: messageTemplateKey, + Body: JSON.stringify(t.context.messageTemplate) + }).promise(); + + t.context.event = { + config: { + collection: { name: 'collection-name' }, + provider: { name: 'provider-name' }, + queueUrl: await createQueue(), + parsePdrMessageTemplateUri: `s3://${t.context.templateBucket}/${messageTemplateKey}` + }, + input: { + pdrs: [] + } + }; +}); + +test.afterEach(async (t) => { + await Promise.all([ + recursivelyDeleteS3Bucket(t.context.templateBucket), + sqs().deleteQueue({ QueueUrl: t.context.event.config.queueUrl }).promise() + ]); +}); + +test('The correct output is returned when PDRs are queued', async (t) => { + const event = t.context.event; + event.input.pdrs = [ + { name: randomString(), path: randomString() }, + { name: randomString(), path: randomString() } + ]; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + const output = await queuePdrs(event); + + await validateOutput(t, output); + t.deepEqual(output, { pdrs_queued: 2 }); +}); + +test('The correct output is returned when no PDRs are queued', async (t) => { + const event = t.context.event; + event.input.pdrs = []; + + await validateConfig(t, event.config); + await validateInput(t, event.input); - t.context.bucket = randomString(); - return s3().createBucket({ Bucket: t.context.bucket }).promise(); + const output = await queuePdrs(event); + + await validateOutput(t, output); + t.deepEqual(output, { pdrs_queued: 0 }); }); -test.afterEach.always((t) => - Promise.all([ - recursivelyDeleteS3Bucket(t.context.bucket), - sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() - ])); +test('PDRs are added to the queue', async (t) => { + const event = t.context.event; + event.input.pdrs = [ + { name: randomString(), path: randomString() }, + { name: randomString(), path: randomString() } + ]; + + await validateConfig(t, event.config); + await validateInput(t, event.input); -test('queue pdrs', async (t) => { - const Bucket = t.context.bucket; - const ParsePdrTemplate = `s3://${Bucket}/dev/workflows/ParsePdr.json`; + const output = await queuePdrs(event); - await aws.s3().putObject({ - Bucket, - Key: 'dev/workflows/ParsePdr.json', - Body: JSON.stringify(workflowTemplate) + await validateOutput(t, output); + + // Get messages from the queue + const receiveMessageResponse = await sqs().receiveMessage({ + QueueUrl: t.context.event.config.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 }).promise(); + const messages = receiveMessageResponse.Messages; - MockAWS.stub('StepFunctions', 'describeExecution').returns({ - promise: () => Promise.resolve({}) - }); + t.is(messages.length, 2); +}); + +test('The correct message is enqueued', async (t) => { + const event = t.context.event; + event.input.pdrs = [ + { + name: randomString(), + path: randomString() + }, + { + name: randomString(), + path: randomString() + }, + ]; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + const output = await queuePdrs(event); - const input = Object.assign({}, inputJSON); - input.config.templateUri = ParsePdrTemplate; - input.config.queueUrl = t.context.queueUrl; + await validateOutput(t, output); - return handler(input, {}, (e, output) => { - t.ifError(e); - t.is(typeof output, 'object'); - t.is(output.pdrs_queued, 2); - MockAWS.StepFunctions.restore(); + // Get messages from the queue + const receiveMessageResponse = await sqs().receiveMessage({ + QueueUrl: t.context.event.config.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise(); + const messages = receiveMessageResponse.Messages.map((message) => JSON.parse(message.Body)); + + t.is(messages.length, 2); + + const receivedPdrnames = messages.map((message) => message.payload.pdr.name); + event.input.pdrs.map((pdr) => pdr.name).forEach((pdrName) => + t.true(receivedPdrnames.includes(pdrName))); + + // Figure out what messages we should have received for each PDR + const expectedMessages = {}; + event.input.pdrs.forEach((pdr) => { + expectedMessages[pdr.name] = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { + collection: { name: 'collection-name' }, + provider: { name: 'provider-name' } + }, + payload: { + pdr: { + name: pdr.name, + path: pdr.path + } + } + }; + }); + + // Make sure we did receive those messages + messages.forEach((message) => { + const pdrName = message.payload.pdr.name; + t.deepEqual(message, expectedMessages[pdrName]); }); }); + +test.todo('An appropriate error is thrown if the message template could not be fetched'); diff --git a/packages/common/test-utils.js b/packages/common/test-utils.js index 71f67cb9172..66bcc5c125a 100644 --- a/packages/common/test-utils.js +++ b/packages/common/test-utils.js @@ -131,7 +131,8 @@ async function validateJSON(t, schemaFilename, data) { const message = `${schemaName} validation failed: ${ajv.errorsText()}`; console.log(message); console.log(JSON.stringify(data, null, 2)); - return t.fail(message); + t.fail(message); + throw new Error(message); } return valid; } diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index b045a249f49..7990c27120f 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -1,6 +1,5 @@ 'use strict'; -const uuidv4 = require('uuid/v4'); const { getS3Object, sendSQSMessage, @@ -20,24 +19,33 @@ async function getMessageFromTemplate(templateUri) { } /** - * Create a message from a template stored on S3 - * - * @param {string} queueUrl - The SQS url - * @param {string} templateUri - S3 uri to the workflow template - * @param {Object} provider - Cumulus provider object - * @param {Object} collection - Cumulus collection object - * @param {Object} pdr - the PDR object - * @param {string} pdr.name - name of the PDR - * @returns {Promise} promise returned from SQS.sendMessage() - **/ -async function queuePdr(queueUrl, templateUri, provider, collection, pdr) { - const message = await getMessageFromTemplate(templateUri, provider, collection); + * Enqueue a PDR to be parsed + * + * @param {Object} pdr - the PDR to be enqueued for parsing + * @param {string} queueUrl - the SQS queue to add the message to + * @param {string} parsePdrMessageTemplateUri - the S3 URI of template for + * a granule ingest message + * @param {Object} provider - the provider config to be attached to the message + * @param {Object} collection - the collection config to be attached to the + * message + * @returns {Promise} - resolves when the message has been enqueued + */ +async function enqueueParsePdrMessage( + pdr, + queueUrl, + parsePdrMessageTemplateUri, + provider, + collection) { + const message = await getMessageFromTemplate(parsePdrMessageTemplateUri); + + message.meta.provider = provider; + message.meta.collection = collection; message.payload = { pdr }; - message.cumulus_meta.execution_name = uuidv4(); return sendSQSMessage(queueUrl, message); } +module.exports.enqueueParsePdrMessage = enqueueParsePdrMessage; /** * Enqueue a granule to be ingested @@ -77,5 +85,3 @@ async function enqueueGranuleIngestMessage( return sendSQSMessage(queueUrl, message); } exports.enqueueGranuleIngestMessage = enqueueGranuleIngestMessage; - -module.exports.queuePdr = queuePdr;