diff --git a/.circleci/config.yml b/.circleci/config.yml index f225ec8357f..97bc5f8d4da 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -123,11 +123,17 @@ jobs: - ./cumulus/services/sfn-throttler/node_modules - run: - name: Running Test + name: Running Tests environment: LOCALSTACK_HOST: localstack command: yarn test + - run: + name: Running End to End Tests + environment: + LOCALSTACK_HOST: localstack + command: yarn e2e + build_and_publish: docker: - image: circleci/node:6.10 diff --git a/CHANGELOG.md b/CHANGELOG.md index fba62c472e8..4e4ea54c130 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +- added tools to @cumulus/integration-tests for local integration testing +- added end to end testing for discovering and parsing of PDRs +- `yarn e2e` command is available for end to end testing ### Fixed - **CUMULUS-175: "Dashboard providers not in sync with AWS providers."** The root cause of this bug - DynamoDB operations not showing up in Elasticsearch - was shared by collections and rules. The fix was to update providers', collections' and rules; POST, PUT and DELETE endpoints to operate on DynamoDB and using DynamoDB streams to update Elasticsearch. The following packages were made: diff --git a/README.md b/README.md index fefabe9d533..8d5afe9d4f7 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,10 @@ Run the test commands next $ yarn test +Run end to end tests by + + $ yarn e2e + ## Adding New Packages Create a new folder under `packages` if it is a common library or create folder under `cumulus/tasks` if it is a lambda task. `cd` to the folder and run `npm init`. diff --git a/package.json b/package.json index f29f79b44ba..43ea36231c9 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "version": "1.0.0", "description": "Cumulus Framework for ingesting and processing Nasa Earth data streams", "scripts": { + "e2e": "env TEST=true ava tests/*.js --serial", "test": "lerna run test", "bootstrap": "lerna bootstrap", "ybootstrap": "lerna bootstrap --npm-client=yarn", @@ -17,6 +18,14 @@ "type": "git", "url": "https://github.com/cumulus-nasa/cumulus" }, + "ava": { + "files": "test", + "babel": "inherit", + "require": [ + "babel-polyfill", + "babel-register" + ] + }, "keywords": [ "GIBS", "CUMULUS", @@ -33,13 +42,14 @@ "author": "Cumulus Authors", "license": "Apache-2.0", "devDependencies": { - "ava": "^0.24.0", + "ava": "^0.25.0", "babel-core": "^6.13.2", "babel-eslint": "^6.1.2", "babel-loader": "^6.2.4", "babel-plugin-transform-async-to-generator": "^6.8.0", "babel-polyfill": "^6.13.0", "babel-preset-es2015": "^6.13.2", + "babel-preset-es2017": "^6.24.1", "copy-webpack-plugin": "^4.0.1", "eslint": "^3.2.2", "eslint-config-airbnb": "^10.0.0", @@ -55,5 +65,8 @@ "transform-loader": "^0.2.3", "webpack": "^1.13.3", "webpack-node-externals": "^1.5.4" + }, + "dependencies": { + "fs-extra": "^5.0.0" } } diff --git a/packages/common/aws.js b/packages/common/aws.js index 5477bbd89c7..c6fe7460e7c 100644 --- a/packages/common/aws.js +++ b/packages/common/aws.js @@ -500,6 +500,52 @@ exports.sendSQSMessage = (queueUrl, message) => { }).promise(); }; +/** + * Receives SQS messages from a given queue. The number of messages received + * can be set and the timeout is also adjustable. + * + * @param {string} queueUrl - url of the SQS queue + * @param {integer} numOfMessages - number of messages to read from the queue + * @param {integer} timeout - number of seconds it takes for a message to timeout + * @returns {Promise.} an array of messages + */ +exports.receiveSQSMessages = async (queueUrl, numOfMessages = 1, timeout = 30) => { + const params = { + QueueUrl: queueUrl, + AttributeNames: ['All'], + VisibilityTimeout: timeout, + MaxNumberOfMessages: numOfMessages + }; + + const messages = await exports.sqs().receiveMessage(params).promise(); + + // convert body from string to js object + if (Object.prototype.hasOwnProperty.call(messages, 'Messages')) { + messages.Messages.forEach((mes) => { + mes.Body = JSON.parse(mes.Body); // eslint-disable-line no-param-reassign + }); + + return messages.Messages; + } + return []; +}; + +/** + * Delete a given SQS message from a given queue. + * + * @param {string} queueUrl - url of the SQS queue + * @param {integer} receiptHandle - the unique identifier of the sQS message + * @returns {Promise} an AWS SQS response + */ +exports.deleteSQSMessage = (queueUrl, receiptHandle) => { + const params = { + QueueUrl: queueUrl, + ReceiptHandle: receiptHandle + }; + + return exports.sqs().deleteMessage(params).promise(); +}; + /** * Returns execution ARN from a statement machine Arn and executionName * diff --git a/packages/ingest/consumer.js b/packages/ingest/consumer.js index da7db0d8c4b..e9c11214d0c 100644 --- a/packages/ingest/consumer.js +++ b/packages/ingest/consumer.js @@ -1,7 +1,7 @@ 'use strict'; const log = require('@cumulus/common/log'); -const aws = require('./aws'); +const { receiveSQSMessages, deleteSQSMessage } = require('@cumulus/common/aws'); class Consume { constructor(queueUrl, messageLimit = 1, timeLimit = 90) { @@ -15,7 +15,7 @@ class Consume { async processMessage(message, fn) { try { await fn(message); - await aws.SQS.deleteMessage(this.queueUrl, message.ReceiptHandle); + await deleteSQSMessage(this.queueUrl, message.ReceiptHandle); } catch (e) { log.error(e); @@ -25,7 +25,7 @@ class Consume { async processMessages(fn, messageLimit) { let counter = 0; while (!this.endConsume) { - const messages = await aws.SQS.receiveMessage(this.queueUrl, messageLimit); + const messages = await receiveSQSMessages(this.queueUrl, messageLimit); counter += messages.length; if (messages.length > 0) { diff --git a/packages/integration-tests/local.js b/packages/integration-tests/local.js new file mode 100644 index 00000000000..e28bd51e207 --- /dev/null +++ b/packages/integration-tests/local.js @@ -0,0 +1,168 @@ +/** + * Includes helper functions for replicating Step Function Workflows + * locally + */ +'use strict'; + +const path = require('path'); +const fs = require('fs-extra'); +const clone = require('lodash.clonedeep'); +const { randomString } = require('@cumulus/common/test-utils'); +const { template } = require('@cumulus/deployment/lib/message'); +const { fetchMessageAdapter } = require('@cumulus/deployment/lib/adapter'); + +/** + * Download cumulus message adapter (CMA) and unzip it + * + * @param {string} version - cumulus message adapter version number (optional) + * @returns {Promise.} an object with path to the zip and extracted CMA + */ +async function downloadCMA(version) { + // download and unzip the message adapter + const gitPath = 'cumulus-nasa/cumulus-message-adapter'; + const filename = 'cumulus-message-adapter.zip'; + const src = path.join(process.cwd(), 'tests', `${randomString()}.zip`); + const dest = path.join(process.cwd(), 'tests', randomString()); + await fetchMessageAdapter(version, gitPath, filename, src, dest); + return { + src, + dest + }; +} + +/** + * Copy cumulus message adapter python folder to each task + * in the workflow + * + * @param {Object} workflow - a test workflow object + * @param {string} src - the path to the cumulus message adapter folder + * @param {string} cmaFolder - the name of the folder where CMA is copied to + * @returns {Promise.} an array of undefined values + */ +function copyCMAToTasks(workflow, src, cmaFolder) { + return Promise.all( + workflow.steps.map( + (step) => fs.copy(src, path.join(step.lambda, cmaFolder)) + ) + ); +} + +/** + * Delete cumulus message adapter from all tasks in the test workflow + * + * @param {Object} workflow - a test workflow object + * @param {string} cmaFolder - the name of the folder where CMA is copied to + * @returns {Promise.} an array of undefined values + */ +function deleteCMAFromTasks(workflow, cmaFolder) { + return Promise.all( + workflow.steps.map( + (step) => fs.remove(path.join(step.lambda, cmaFolder)) + ) + ); +} + +/** + * Build a cumulus message for a given workflow + * + * @param {Object} workflow - a test workflow object + * @param {Object} configOverride - a cumulus config override object + * @param {Array} cfOutputs - mocked outputs of a CloudFormation template + * @returns {Object} the generated cumulus message + */ +function messageBuilder(workflow, configOverride, cfOutputs) { + const workflowConfigs = {}; + workflow.steps.forEach((step) => { + workflowConfigs[step.name] = step.cumulusConfig; + }); + + const config = { + stack: 'somestack', + workflowConfigs: { + [workflow.name]: workflowConfigs + } + }; + Object.assign(config, configOverride); + config.stackName = config.stack; + + const message = template(workflow.name, { States: workflowConfigs }, config, cfOutputs); + message.cumulus_meta.message_source = 'local'; + return message; +} + +/** + * Runs a given workflow step (task) + * + * @param {string} lambdaPath - the local path to the task (e.g. path/to/task) + * @param {string} lambdaHandler - the lambda handler (e.g. index.hanlder) + * @param {Object} message - the cumulus message input for the task + * @param {string} stepName - name of the step/task + * @returns {Promise.} the cumulus message returned by the task + */ +async function runStep(lambdaPath, lambdaHandler, message, stepName) { + const taskFullPath = path.join(process.cwd(), lambdaPath); + const src = path.join(taskFullPath, 'adapter.zip'); + const dest = path.join(taskFullPath, 'cumulus-message-adapter'); + + process.env.CUMULUS_MESSAGE_ADAPTER_DIR = dest; + + // add step name to the message + message.cumulus_meta.task = stepName; + + try { + // run the task + const moduleFn = lambdaHandler.split('.'); + const moduleFileName = moduleFn[0]; + const moduleFunctionName = moduleFn[1]; + const task = require(`${taskFullPath}/${moduleFileName}`); // eslint-disable-line global-require + + console.log(`Started execution of ${stepName}`); + + return new Promise((resolve, reject) => { + task[moduleFunctionName](message, {}, (e, r) => { + if (e) return reject(e); + console.log(`Completed execution of ${stepName}`); + return resolve(r); + }); + }); + } + finally { + await fs.remove(src); + } +} + +/** + * Executes a given workflow by running each step in the workflow + * one after each other + * + * @param {Object} workflow - a test workflow object + * @param {Object} message - input message to the workflow + * @returns {Promise.} an object that includes the workflow input/output + * plus the output of every step + */ +async function runWorkflow(workflow, message) { + const trail = { + input: clone(message), + stepOutputs: {}, + output: {} + }; + + let stepInput = clone(message); + + for (const step of workflow.steps) { + stepInput = await runStep(step.lambda, step.handler, stepInput, step.name); + trail.stepOutputs[step.name] = clone(stepInput); + } + trail.output = clone(stepInput); + + return trail; +} + +module.exports = { + downloadCMA, + copyCMAToTasks, + deleteCMAFromTasks, + runStep, + runWorkflow, + messageBuilder +}; diff --git a/packages/integration-tests/package.json b/packages/integration-tests/package.json index 952d2d5a9a1..b262f965396 100644 --- a/packages/integration-tests/package.json +++ b/packages/integration-tests/package.json @@ -28,6 +28,7 @@ "license": "Apache-2.0", "dependencies": { "@cumulus/common": "^1.1.0", + "@cumulus/deployment": "^1.0.1", "babel-core": "^6.25.0", "babel-loader": "^6.2.4", "babel-plugin-transform-async-to-generator": "^6.24.1", @@ -35,6 +36,7 @@ "babel-preset-es2017": "^6.24.1", "commander": "^2.9.0", "fs-extra": "^5.0.0", + "lodash.clonedeep": "^4.5.0", "uuid": "^3.2.1", "webpack": "^1.12.13" } diff --git a/tests/fixtures/collections.json b/tests/fixtures/collections.json new file mode 100644 index 00000000000..efc59890b14 --- /dev/null +++ b/tests/fixtures/collections.json @@ -0,0 +1,34 @@ +{ + "MOD09GQ": { + "name": "MOD09GQ", + "version": "006", + "dataType": "MOD09GQ", + "process": "modis", + "provider_path": "/pdrs", + "granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf", + "granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf", + "files": [ + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", + "bucket": "private", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", + "bucket": "public", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg" + } + ] + } +} \ No newline at end of file diff --git a/tests/fixtures/providers.json b/tests/fixtures/providers.json new file mode 100644 index 00000000000..3ea4d95904a --- /dev/null +++ b/tests/fixtures/providers.json @@ -0,0 +1,24 @@ +{ + "ftp": { + "id": "ftp", + "protocol": "ftp", + "host": "localhost", + "username": "testuser", + "password": "testpass" + }, + "http": { + "id": "http", + "protocol": "http", + "host": "http://localhost:3030", + "username": "fake", + "password": "fake" + }, + "sftp": { + "id": "MODAPS", + "protocol": "sftp", + "host": "localhost", + "port": 2222, + "username": "user", + "password": "password" + } +} \ No newline at end of file diff --git a/tests/fixtures/workflows/pdr_parse_ingest.json b/tests/fixtures/workflows/pdr_parse_ingest.json new file mode 100644 index 00000000000..14dc893c40a --- /dev/null +++ b/tests/fixtures/workflows/pdr_parse_ingest.json @@ -0,0 +1,55 @@ +{ + "DiscoverPdrs": { + "collection": "MOD09GQ", + "provider": "ftp", + "name": "DiscoverPdrs", + "steps": [ + { + "name": "DiscoverPdrs", + "lambda": "cumulus/tasks/discover-pdrs", + "handler": "index.handler", + "cumulusConfig": { + "templateUri": "{{$.meta.templates.ParsePdr}}", + "useQueue": true, + "queueUrl": "{{$.meta.queues.startSF}}", + "stack": "{{$.meta.stack}}", + "provider": "{{$.meta.provider}}", + "bucket": "{{$.meta.buckets.internal}}", + "collection": "{{$.meta.collection}}" + } + }, + { + "name": "QueuePdrs", + "lambda": "cumulus/tasks/queue-pdrs", + "handler": "index.handler", + "cumulusConfig": { + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "queueUrl": "{{$.meta.queues.startSF}}", + "parsePdrMessageTemplateUri": "{{$.meta.templates.ParsePdr}}" + } + } + ] + }, + "ParsePdr": { + "collection": "MOD09GQ", + "provider": "ftp", + "name": "ParsePdr", + "steps": [ + { + "name": "ParsePdr", + "lambda": "cumulus/tasks/parse-pdr", + "handler": "index.handler", + "cumulusConfig": { + "useQueue": true, + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "bucket": "{{$.meta.buckets.internal}}", + "stack": "{{$.meta.stack}}", + "templateUri": "{{$.meta.templates.IngestGranule}}", + "queueUrl": "{{$.meta.queues.startSF}}" + } + } + ] + } +} \ No newline at end of file diff --git a/tests/ftp_pdr_parse_ingest.js b/tests/ftp_pdr_parse_ingest.js new file mode 100644 index 00000000000..63397eeecee --- /dev/null +++ b/tests/ftp_pdr_parse_ingest.js @@ -0,0 +1,126 @@ +'use strict'; + +const path = require('path'); +const test = require('ava'); +const fs = require('fs-extra'); +const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter'); +const { + runWorkflow, + downloadCMA, + copyCMAToTasks, + deleteCMAFromTasks, + messageBuilder +} = require('../packages/integration-tests/local'); +const { randomString, createQueue } = require('../packages/common/test-utils'); +const { + recursivelyDeleteS3Bucket, + s3, + sqs, + receiveSQSMessages +} = require('../packages/common/aws'); +const workflowSet = require('./fixtures/workflows/pdr_parse_ingest.json'); +const collections = require('./fixtures/collections.json'); +const providers = require('./fixtures/providers.json'); + +// unfortunately t.context is not available in test.before +// this is fixed in ava 1.0.0 but it has a lot of breaking +// changes. The global variables below help with passing messages +// around between before and after hooks. +const context = {}; +const cmaFolder = 'cumulus-message-adapter'; + + +test.before(async() => { + context.internal = randomString(); + context.stack = randomString(); + context.templates = {}; + await s3().createBucket({ Bucket: context.internal }).promise(); + + // download and unzip the message adapter + const { src, dest } = await downloadCMA(); + context.src = src; + context.dest = dest; + + // create the queue + context.queueUrl = await createQueue(); + + const config = { + buckets: { + internal: context.internal + }, + stack: context.stack, + stepFunctions: {}, + sqs: {} + }; + + const cfOutputs = [{ + OutputKey: 'startSFSQSOutput', + OutputValue: context.queueUrl + }]; + + // create workflow templates + Object.keys(workflowSet).forEach((w) => { + config.stepFunctions[w] = {}; + }); + + const promises = Object.keys(workflowSet).map((w) => { + context.templates[w] = messageBuilder(workflowSet[w], config, cfOutputs); + return s3().putObject({ + Bucket: context.internal, + Key: `${context.stack}/workflows/${w}.json`, + Body: JSON.stringify(context.templates[w]) + }).promise(); + }); + + // upload templates + await Promise.all(promises); +}); + +test.serial('Discover and queue PDRs with FTP provider', async (t) => { + const workflow = workflowSet.DiscoverPdrs; + t.context.workflow = workflow; + const input = context.templates.DiscoverPdrs; + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + input.meta.collection = collections[workflow.collection]; + input.meta.provider = providers.ftp; + const msg = await runWorkflow(workflow, input); + + // discover-pdr must return a list of PDRs + const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; + t.true(Array.isArray(pdrs)); + t.is(pdrs.length, 4); + + t.is(msg.output.payload.pdrs_queued, pdrs.length); +}); + +test.serial('Parse Pdrs from the previous step', async (t) => { + const workflow = workflowSet.ParsePdr; + t.context.workflow = workflow; + + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + const messages = await receiveSQSMessages(context.queueUrl, 4); + + for (const input of messages) { + const msg = await runWorkflow(workflow, input.Body); + t.truthy(msg.input.payload.pdr); + t.is( + msg.output.payload.granules.length, + msg.output.payload.granulesCount + ); + } +}); + +test.afterEach.always(async(t) => { + await deleteCMAFromTasks(t.context.workflow, cmaFolder); +}); + +test.after.always('final cleanup', async() => { + await recursivelyDeleteS3Bucket(context.internal); + await sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(); + await fs.remove(context.src); + await fs.remove(context.dest); +}); diff --git a/tests/sftp_pdr_parse_ingest.js b/tests/sftp_pdr_parse_ingest.js new file mode 100644 index 00000000000..8cc4c3b1fb4 --- /dev/null +++ b/tests/sftp_pdr_parse_ingest.js @@ -0,0 +1,125 @@ +'use strict'; + +const path = require('path'); +const test = require('ava'); +const fs = require('fs-extra'); +const { + runWorkflow, + downloadCMA, + copyCMAToTasks, + deleteCMAFromTasks, + messageBuilder +} = require('../packages/integration-tests/local'); +const { randomString, createQueue } = require('../packages/common/test-utils'); +const { + recursivelyDeleteS3Bucket, + s3, + sqs, + receiveSQSMessages +} = require('../packages/common/aws'); +const workflowSet = require('./fixtures/workflows/pdr_parse_ingest.json'); +const collections = require('./fixtures/collections.json'); +const providers = require('./fixtures/providers.json'); + +// unfortunately t.context is not available in test.before +// this is fixed in ava 1.0.0 but it has a lot of breaking +// changes. The global variables below help with passing messages +// around between before and after hooks. +const context = {}; +const cmaFolder = 'cumulus-message-adapter'; + + +test.before(async() => { + context.internal = randomString(); + context.stack = randomString(); + context.templates = {}; + await s3().createBucket({ Bucket: context.internal }).promise(); + + // download and unzip the message adapter + const { src, dest } = await downloadCMA(); + context.src = src; + context.dest = dest; + + // create the queue + context.queueUrl = await createQueue(); + + const config = { + buckets: { + internal: context.internal + }, + stack: context.stack, + stepFunctions: {}, + sqs: {} + }; + + const cfOutputs = [{ + OutputKey: 'startSFSQSOutput', + OutputValue: context.queueUrl + }]; + + // create workflow templates + Object.keys(workflowSet).forEach((w) => { + config.stepFunctions[w] = {}; + }); + + const promises = Object.keys(workflowSet).map((w) => { + context.templates[w] = messageBuilder(workflowSet[w], config, cfOutputs); + return s3().putObject({ + Bucket: context.internal, + Key: `${context.stack}/workflows/${w}.json`, + Body: JSON.stringify(context.templates[w]) + }).promise(); + }); + + // upload templates + await Promise.all(promises); +}); + +test.serial('Discover and queue PDRs with FTP provider', async (t) => { + const workflow = workflowSet.DiscoverPdrs; + t.context.workflow = workflow; + const input = context.templates.DiscoverPdrs; + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + input.meta.collection = collections[workflow.collection]; + input.meta.provider = providers.sftp; + const msg = await runWorkflow(workflow, input); + + // discover-pdr must return a list of PDRs + const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; + t.true(Array.isArray(pdrs)); + t.is(pdrs.length, 4); + + t.is(msg.output.payload.pdrs_queued, pdrs.length); +}); + +test.serial('Parse Pdrs from the previous step', async (t) => { + const workflow = workflowSet.ParsePdr; + t.context.workflow = workflow; + + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + const messages = await receiveSQSMessages(context.queueUrl, 4); + + for (const input of messages) { + const msg = await runWorkflow(workflow, input.Body); + t.truthy(msg.input.payload.pdr); + t.is( + msg.output.payload.granules.length, + msg.output.payload.granulesCount + ); + } +}); + +test.afterEach.always(async(t) => { + await deleteCMAFromTasks(t.context.workflow, cmaFolder); +}); + +test.after.always('final cleanup', async() => { + await recursivelyDeleteS3Bucket(context.internal); + await sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(); + await fs.remove(context.src); + await fs.remove(context.dest); +});