From 621beb5420c111d990a3c89b0e7d9f1d408c757e Mon Sep 17 00:00:00 2001 From: Scisco Date: Thu, 8 Mar 2018 15:30:01 -0500 Subject: [PATCH 1/9] tools for local integration testing --- CHANGELOG.md | 3 + packages/integration-tests/local.js | 153 ++++++++++++++++++++++++ packages/integration-tests/package.json | 2 + 3 files changed, 158 insertions(+) create mode 100644 packages/integration-tests/local.js diff --git a/CHANGELOG.md b/CHANGELOG.md index e049730b339..1d428c989fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +- added tools to @cumulus/integration-tests for local integration testing + ### Updated - Broke up `kes.override.js` of @cumulus/deployment to multiple modules and moved to a new location - Expanded @cumulus/deployment test coverage diff --git a/packages/integration-tests/local.js b/packages/integration-tests/local.js new file mode 100644 index 00000000000..dbed0f90c9b --- /dev/null +++ b/packages/integration-tests/local.js @@ -0,0 +1,153 @@ +/** + * Includes helper functions for replicating Step Function Workflows + * locally + */ +'use strict'; + +const path = require('path'); +const fs = require('fs-extra'); +const clone = require('lodash.clonedeep'); +const { template } = require('@cumulus/deployment/lib/message'); + +/** + * Copy cumulus message adapter python folder to each task + * in the workflow + * + * @param {Object} workflow - a test workflow object + * @param {string} src - the path to the cumulus message adapter folder + * @param {string} cmaFolder - the name of the folder where CMA is copied to + * @returns {Promise.} an array of undefined values + */ +function copyCMAToTasks(workflow, src, cmaFolder) { + return Promise.all( + workflow.steps.map( + (step) => fs.copy(src, path.join(step.lambda, cmaFolder)) + ) + ); +} + +/** + * Delete cumulus message adapter from all tasks in the test workflow + * + * @param {Object} workflow - a test workflow object + * @param {string} cmaFolder - the name of the folder where CMA is copied to + * @returns {Promise.} an array of undefined values + */ +function deleteCMAFromTasks(workflow, cmaFolder) { + return Promise.all( + workflow.steps.map( + (step) => fs.remove(path.join(step.lambda, cmaFolder)) + ) + ); +} + +/** + * Build a cumulus message for a given workflow + * + * @param {Object} workflow - a test workflow object + * @param {Object} collection - the cumulus collection object + * @param {Object} provider - the cumulus provider object + * @param {string} bucket - the name of the s3 bucket used by system_bucket + * @returns {Object} the generated cumulus message + */ +function messageBuilder(workflow, collection, provider, bucket) { + const workflowConfigs = {} + workflow.steps.forEach((step) => { + workflowConfigs[step.name] = step.cumulusConfig; + }); + + const config = { + buckets: { + internal: bucket + }, + stackName: 'somestack', + workflowConfigs: { + [workflow.name]: workflowConfigs + } + }; + + const message = template(workflow.name, { States: workflowConfigs }, config, []); + message.meta.provider = provider; + message.meta.collection = collection; + message.cumulus_meta.message_source = 'local'; + return message; +} + +/** + * Runs a given workflow step (task) + * + * @param {string} lambdaPath - the local path to the task (e.g. path/to/task) + * @param {string} lambdaHandler - the lambda handler (e.g. index.hanlder) + * @param {Object} message - the cumulus message input for the task + * @param {string} stepName - name of the step/task + * @returns {Promise.} the cumulus message returned by the task + */ +async function runStep(lambdaPath, lambdaHandler, message, stepName) { + const taskFullPath = path.join(process.cwd(), lambdaPath); + const src = path.join(taskFullPath, 'adapter.zip'); + const dest = path.join(taskFullPath, 'cumulus-message-adapter'); + let resp; + + process.env.CUMULUS_MESSAGE_ADAPTER_DIR = dest; + + // add step name to the message + message.cumulus_meta.task = stepName; + + try { + // add message adapter to task folder + + // run the task + const moduleFn = lambdaHandler.split('.'); + const moduleFileName = moduleFn[0]; + const moduleFunctionName = moduleFn[1]; + const task = require(`${taskFullPath}/${moduleFileName}`); + + return new Promise((resolve, reject) => { + task[moduleFunctionName](message, {}, (e, r) => { + if (e) return reject(e); + return resolve(r); + }); + }); + } + finally { + await fs.remove(src); + } +} + +/** + * Executes a given workflow by running each step in the workflow + * one after each other + * + * @param {Object} workflow - a test workflow object + * @param {Object} collection - the cumulus collection object + * @param {Object} provider - the cumulus provider object + * @param {string} bucket - the name of the s3 bucket used by system_bucket + * @returns {Promise.} an object that includes the workflow input/output + * plus the output of every step + */ +async function runWorkflow(workflow, collection, provider, bucket) { + // build the input message + const message = messageBuilder(workflow, collection, provider, bucket); + const trail = { + input: clone(message), + stepOutputs: {}, + output: {} + }; + + let stepInput = clone(message); + + for (const step of workflow.steps) { + stepInput = await runStep(step.lambda, step.handler, stepInput, step.name) + trail.stepOutputs[step.name] = clone(stepInput); + } + trail.output = clone(stepInput); + + return trail; +} + +module.exports = { + copyCMAToTasks, + deleteCMAFromTasks, + runStep, + runWorkflow +}; diff --git a/packages/integration-tests/package.json b/packages/integration-tests/package.json index 952d2d5a9a1..b262f965396 100644 --- a/packages/integration-tests/package.json +++ b/packages/integration-tests/package.json @@ -28,6 +28,7 @@ "license": "Apache-2.0", "dependencies": { "@cumulus/common": "^1.1.0", + "@cumulus/deployment": "^1.0.1", "babel-core": "^6.25.0", "babel-loader": "^6.2.4", "babel-plugin-transform-async-to-generator": "^6.24.1", @@ -35,6 +36,7 @@ "babel-preset-es2017": "^6.24.1", "commander": "^2.9.0", "fs-extra": "^5.0.0", + "lodash.clonedeep": "^4.5.0", "uuid": "^3.2.1", "webpack": "^1.12.13" } From 08fbd75455df51e10961dbf6c05aad79f594c26f Mon Sep 17 00:00:00 2001 From: Scisco Date: Thu, 8 Mar 2018 15:32:22 -0500 Subject: [PATCH 2/9] first series of local integration tests --- package.json | 15 ++++- tests/discover_pdrs.js | 64 ++++++++++++++++++ tests/fixtures/collections/MOD09GQ.json | 32 +++++++++ .../providers/local_ftp_provider.json | 8 +++ tests/fixtures/workflows/DiscoverPdrs.json | 19 ++++++ tests/fixtures/workflows/DiscoverPdrs.yml | 0 tests/fixtures/workflows/IngestGranule.json | 67 +++++++++++++++++++ tests/fixtures/workflows/ParsePdr.json | 12 ++++ 8 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 tests/discover_pdrs.js create mode 100644 tests/fixtures/collections/MOD09GQ.json create mode 100644 tests/fixtures/providers/local_ftp_provider.json create mode 100644 tests/fixtures/workflows/DiscoverPdrs.json create mode 100644 tests/fixtures/workflows/DiscoverPdrs.yml create mode 100644 tests/fixtures/workflows/IngestGranule.json create mode 100644 tests/fixtures/workflows/ParsePdr.json diff --git a/package.json b/package.json index f29f79b44ba..f7af9972599 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "version": "1.0.0", "description": "Cumulus Framework for ingesting and processing Nasa Earth data streams", "scripts": { + "e2e": "env TEST=true ava tests/*.js", "test": "lerna run test", "bootstrap": "lerna bootstrap", "ybootstrap": "lerna bootstrap --npm-client=yarn", @@ -17,6 +18,14 @@ "type": "git", "url": "https://github.com/cumulus-nasa/cumulus" }, + "ava": { + "files": "test", + "babel": "inherit", + "require": [ + "babel-polyfill", + "babel-register" + ] + }, "keywords": [ "GIBS", "CUMULUS", @@ -33,13 +42,14 @@ "author": "Cumulus Authors", "license": "Apache-2.0", "devDependencies": { - "ava": "^0.24.0", + "ava": "^0.25.0", "babel-core": "^6.13.2", "babel-eslint": "^6.1.2", "babel-loader": "^6.2.4", "babel-plugin-transform-async-to-generator": "^6.8.0", "babel-polyfill": "^6.13.0", "babel-preset-es2015": "^6.13.2", + "babel-preset-es2017": "^6.24.1", "copy-webpack-plugin": "^4.0.1", "eslint": "^3.2.2", "eslint-config-airbnb": "^10.0.0", @@ -55,5 +65,8 @@ "transform-loader": "^0.2.3", "webpack": "^1.13.3", "webpack-node-externals": "^1.5.4" + }, + "dependencies": { + "fs-extra": "^5.0.0" } } diff --git a/tests/discover_pdrs.js b/tests/discover_pdrs.js new file mode 100644 index 00000000000..74eefd327f7 --- /dev/null +++ b/tests/discover_pdrs.js @@ -0,0 +1,64 @@ +'use strict'; + +const path = require('path'); +const test = require('ava'); +const fs = require('fs-extra'); +const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter'); +const { + runWorkflow, + copyCMAToTasks, + deleteCMAFromTasks +} = require('../packages/integration-tests/local'); +const { randomString } = require('../packages/common/test-utils'); +const { recursivelyDeleteS3Bucket, s3 } = require('../packages/common/aws'); +const DiscoverPdrsWorkflow = require('./fixtures/workflows/DiscoverPdrs.json'); +const IngestGranuleWorkflow = require('./fixtures/workflows/IngestGranule.json'); +const localFTPProvider = require('./fixtures/providers/local_ftp_provider.json'); +const mod09gq = require('./fixtures/collections/MOD09GQ.json'); + +// unfortunately t.context is not available in test.before +// this is fixed in ava 1.0.0 but it has a lot of breaking +// changes. The global variables below help with passing messages +// around between before and after hooks. +let internal; +let src; +let dest; +const cmaFolder = 'cumulus-message-adapter'; + + +test.before(async(t) => { + internal = randomString(); + await s3().createBucket({ Bucket: internal }).promise(); + + // download and unzip the message adapter + const gitPath = 'cumulus-nasa/cumulus-message-adapter'; + const filename = 'cumulus-message-adapter.zip'; + src = path.join(process.cwd(), 'tests', 'adapter.zip'); + dest = path.join(process.cwd(), 'tests', cmaFolder); + await fetchMessageAdapter(null, gitPath, filename, src, dest); +}); + +test('DiscoverPdr Workflow with FTP provider', async (t) => { + + try { + // copy cumulus-message-adapter + await copyCMAToTasks(DiscoverPdrsWorkflow, dest, cmaFolder); + + const msg = await runWorkflow(DiscoverPdrsWorkflow, mod09gq, localFTPProvider, internal); + + // discover-pdr must return a list of PDRs + const pdrs = msg.output.payload.pdrs; + t.true(Array.isArray(pdrs)); + t.is(pdrs.length, 4); + } + finally { + // remove cumulus-message-adapter from tasks + await deleteCMAFromTasks(DiscoverPdrsWorkflow, cmaFolder); + } +}); + +test.after.always('final cleanup', async(t) => { + await recursivelyDeleteS3Bucket(internal); + await fs.remove(src); + await fs.remove(dest); +}); diff --git a/tests/fixtures/collections/MOD09GQ.json b/tests/fixtures/collections/MOD09GQ.json new file mode 100644 index 00000000000..09319aa5299 --- /dev/null +++ b/tests/fixtures/collections/MOD09GQ.json @@ -0,0 +1,32 @@ +{ + "name": "MOD09GQ", + "version": "006", + "dataType": "MOD09GQ", + "process": "modis", + "provider_path": "/pdrs", + "granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf", + "granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf", + "files": [ + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", + "bucket": "private", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", + "bucket": "public", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg" + } + ] +} \ No newline at end of file diff --git a/tests/fixtures/providers/local_ftp_provider.json b/tests/fixtures/providers/local_ftp_provider.json new file mode 100644 index 00000000000..723cc05c21a --- /dev/null +++ b/tests/fixtures/providers/local_ftp_provider.json @@ -0,0 +1,8 @@ +{ + "id": "MODAPS", + "protocol": "sftp", + "host": "localhost", + "port": 2222, + "username": "user", + "password": "password" +} \ No newline at end of file diff --git a/tests/fixtures/workflows/DiscoverPdrs.json b/tests/fixtures/workflows/DiscoverPdrs.json new file mode 100644 index 00000000000..03084b88cee --- /dev/null +++ b/tests/fixtures/workflows/DiscoverPdrs.json @@ -0,0 +1,19 @@ +{ + "name": "DiscoverPdrs", + "steps": [ + { + "name": "DiscoverPdrs", + "lambda": "cumulus/tasks/discover-pdrs", + "handler": "index.handler", + "cumulusConfig": { + "templateUri": "{{$.meta.templates.ParsePdr}}", + "useQueue": true, + "queueUrl": "{{$.meta.queues.startSF}}", + "stack": "{{$.meta.stack}}", + "provider": "{{$.meta.provider}}", + "bucket": "{{$.meta.buckets.internal}}", + "collection": "{{$.meta.collection}}" + } + } + ] +} \ No newline at end of file diff --git a/tests/fixtures/workflows/DiscoverPdrs.yml b/tests/fixtures/workflows/DiscoverPdrs.yml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/fixtures/workflows/IngestGranule.json b/tests/fixtures/workflows/IngestGranule.json new file mode 100644 index 00000000000..c0251340245 --- /dev/null +++ b/tests/fixtures/workflows/IngestGranule.json @@ -0,0 +1,67 @@ +{ + "SyncGranule": { + "buckets": "{{$.meta.buckets}}", + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "cumulus_message": { + "outputs": [ + { + "source": "{{$.granules}}", + "destination": "{{$.meta.input_granules}}" + }, + { + "source": "{{$}}", + "destination": "{{$.payload}}" + } + ] + } + }, + "ChooseProcess": {}, + "AsterProcess": { + "buckets": "{{$.meta.buckets}}", + "distribution_endpoint": "{{$.meta.distribution_endpoint}}", + "granuleIdExtraction": "{{$.meta.collection.granuleIdExtraction}}", + "files_config": "{{$.meta.collection.files}}", + "url_path": "{{$.meta.collection.url_path}}", + "cumulus_message": { + "input": "{[$.payload.granules[*].files[*].filename]}", + "outputs": [ + { + "source": "{{$}}", + "destination": "{{$.meta.all_the_files}}" + }, + { + "source": "{{$}}", + "destination": "{{$.payload}}" + } + ] + } + }, + "ModisProcess": { + "buckets": "{{$.meta.buckets}}", + "distribution_endpoint": "{{$.meta.distribution_endpoint}}", + "granuleIdExtraction": "{{$.meta.collection.granuleIdExtraction}}", + "files_config": "{{$.meta.collection.files}}", + "url_path": "{{$.meta.collection.url_path}}", + "cumulus_message": { + "input": "{[$.payload.granules[*].files[*].filename]}", + "outputs": [ + { + "source": "{{$}}", + "destination": "{{$.meta.all_the_files}}" + }, + { + "source": "{{$}}", + "destination": "{{$.payload}}" + } + ] + } + }, + "CmrStep": { + "bucket": "{{$.meta.buckets.internal}}", + "stack": "{{$.meta.stack}}", + "granuleIdExtraction": "{{$.meta.collection.granuleIdExtraction}}", + "cmr": "{{$.meta.cmr}}", + "input_granules": "{{$.meta.input_granules}}" + } +} \ No newline at end of file diff --git a/tests/fixtures/workflows/ParsePdr.json b/tests/fixtures/workflows/ParsePdr.json new file mode 100644 index 00000000000..3a9a08d3004 --- /dev/null +++ b/tests/fixtures/workflows/ParsePdr.json @@ -0,0 +1,12 @@ +{ + "ParsePdr": { + "useQueue": true, + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "bucket": "{{$.meta.buckets.internal}}", + "stack": "{{$.meta.stack}}", + "templateUri": "{{$.meta.templates.IngestGranule}}", + "queueUrl": "{{$.meta.queues.startSF}}" + }, + "CheckStatus": {} +} \ No newline at end of file From 92fbd39a3feb83080d683c355866afe33eb30b84 Mon Sep 17 00:00:00 2001 From: Scisco Date: Thu, 8 Mar 2018 15:34:11 -0500 Subject: [PATCH 3/9] add end to end tests to circleci --- .circleci/config.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f225ec8357f..97bc5f8d4da 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -123,11 +123,17 @@ jobs: - ./cumulus/services/sfn-throttler/node_modules - run: - name: Running Test + name: Running Tests environment: LOCALSTACK_HOST: localstack command: yarn test + - run: + name: Running End to End Tests + environment: + LOCALSTACK_HOST: localstack + command: yarn e2e + build_and_publish: docker: - image: circleci/node:6.10 From 6e91a7f380453bcf0ffbeabbacf2f570c9938f05 Mon Sep 17 00:00:00 2001 From: Scisco Date: Thu, 8 Mar 2018 16:42:26 -0500 Subject: [PATCH 4/9] separate messageBuilder from runWorkflow --- packages/integration-tests/local.js | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/packages/integration-tests/local.js b/packages/integration-tests/local.js index dbed0f90c9b..5e0dec35051 100644 --- a/packages/integration-tests/local.js +++ b/packages/integration-tests/local.js @@ -47,26 +47,26 @@ function deleteCMAFromTasks(workflow, cmaFolder) { * @param {Object} workflow - a test workflow object * @param {Object} collection - the cumulus collection object * @param {Object} provider - the cumulus provider object - * @param {string} bucket - the name of the s3 bucket used by system_bucket + * @param {Object} configOverride - a cumulus config override object + * @param {Array} cfOutputs - mocked outputs of a CloudFormation template * @returns {Object} the generated cumulus message */ -function messageBuilder(workflow, collection, provider, bucket) { +function messageBuilder(workflow, collection, provider, configOverride, cfOutputs) { const workflowConfigs = {} workflow.steps.forEach((step) => { workflowConfigs[step.name] = step.cumulusConfig; }); const config = { - buckets: { - internal: bucket - }, stackName: 'somestack', + stack: 'somestack', workflowConfigs: { [workflow.name]: workflowConfigs } }; + Object.assign(config, configOverride); - const message = template(workflow.name, { States: workflowConfigs }, config, []); + const message = template(workflow.name, { States: workflowConfigs }, config, cfOutputs); message.meta.provider = provider; message.meta.collection = collection; message.cumulus_meta.message_source = 'local'; @@ -119,15 +119,11 @@ async function runStep(lambdaPath, lambdaHandler, message, stepName) { * one after each other * * @param {Object} workflow - a test workflow object - * @param {Object} collection - the cumulus collection object - * @param {Object} provider - the cumulus provider object - * @param {string} bucket - the name of the s3 bucket used by system_bucket + * @param {Object} message - input message to the workflow * @returns {Promise.} an object that includes the workflow input/output * plus the output of every step */ -async function runWorkflow(workflow, collection, provider, bucket) { - // build the input message - const message = messageBuilder(workflow, collection, provider, bucket); +async function runWorkflow(workflow, message) { const trail = { input: clone(message), stepOutputs: {}, @@ -149,5 +145,6 @@ module.exports = { copyCMAToTasks, deleteCMAFromTasks, runStep, - runWorkflow + runWorkflow, + messageBuilder }; From 1de2af251783699cac27f7983ff39e55c90c4b96 Mon Sep 17 00:00:00 2001 From: Scisco Date: Thu, 8 Mar 2018 16:42:35 -0500 Subject: [PATCH 5/9] delete redundant file --- tests/fixtures/workflows/DiscoverPdrs.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tests/fixtures/workflows/DiscoverPdrs.yml diff --git a/tests/fixtures/workflows/DiscoverPdrs.yml b/tests/fixtures/workflows/DiscoverPdrs.yml deleted file mode 100644 index e69de29bb2d..00000000000 From ef1107f8d32153d9f291bc93687dbfd9e2874bb5 Mon Sep 17 00:00:00 2001 From: Scisco Date: Fri, 9 Mar 2018 10:33:47 -0500 Subject: [PATCH 6/9] move sqs actions to common aws --- packages/common/aws.js | 46 +++++++++++++++++++++++++++++++++++++ packages/ingest/consumer.js | 6 ++--- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/packages/common/aws.js b/packages/common/aws.js index 5477bbd89c7..faf268f0de1 100644 --- a/packages/common/aws.js +++ b/packages/common/aws.js @@ -500,6 +500,52 @@ exports.sendSQSMessage = (queueUrl, message) => { }).promise(); }; +/** + * Receives SQS messages from a given queue. The number of messages received + * can be set and the timeout is also adjustable. + * + * @param {string} queueUrl - url of the SQS queue + * @param {integer} numOfMessages - number of messages to read from the queue + * @param {integer} timeout - number of seconds it takes for a message to timeout + * @returns {Promise.} an array of messages + */ +exports.receiveSQSMessages = async (queueUrl, numOfMessages = 1, timeout = 30) => { + const params = { + QueueUrl: queueUrl, + AttributeNames: ['All'], + VisibilityTimeout: timeout, + MaxNumberOfMessages: numOfMessages + }; + + const messages = await exports.sqs().receiveMessage(params).promise(); + + // convert body from string to js object + if (Object.prototype.hasOwnProperty.call(messages, 'Messages')) { + messages.Messages.forEach((mes) => { + mes.Body = JSON.parse(mes.Body); // eslint-disable-line no-param-reassign + }); + + return messages.Messages; + } + return []; +}; + +/** + * Delete a given SQS message from a given queue. + * + * @param {string} queueUrl - url of the SQS queue + * @param {integer} receiptHandle - the unique identifier of the sQS message + * @returns {Promise} an AWS SQS response + */ +exports.deleteSQSMessage = async (queueUrl, receiptHandle) => { + const params = { + QueueUrl: queueUrl, + ReceiptHandle: receiptHandle + }; + + return exports.sqs().deleteMessage(params).promise(); +}; + /** * Returns execution ARN from a statement machine Arn and executionName * diff --git a/packages/ingest/consumer.js b/packages/ingest/consumer.js index da7db0d8c4b..e9c11214d0c 100644 --- a/packages/ingest/consumer.js +++ b/packages/ingest/consumer.js @@ -1,7 +1,7 @@ 'use strict'; const log = require('@cumulus/common/log'); -const aws = require('./aws'); +const { receiveSQSMessages, deleteSQSMessage } = require('@cumulus/common/aws'); class Consume { constructor(queueUrl, messageLimit = 1, timeLimit = 90) { @@ -15,7 +15,7 @@ class Consume { async processMessage(message, fn) { try { await fn(message); - await aws.SQS.deleteMessage(this.queueUrl, message.ReceiptHandle); + await deleteSQSMessage(this.queueUrl, message.ReceiptHandle); } catch (e) { log.error(e); @@ -25,7 +25,7 @@ class Consume { async processMessages(fn, messageLimit) { let counter = 0; while (!this.endConsume) { - const messages = await aws.SQS.receiveMessage(this.queueUrl, messageLimit); + const messages = await receiveSQSMessages(this.queueUrl, messageLimit); counter += messages.length; if (messages.length > 0) { From d531e169ef34b8932562eb6bcaa557a3ff84b3a0 Mon Sep 17 00:00:00 2001 From: Scisco Date: Fri, 9 Mar 2018 10:43:30 -0500 Subject: [PATCH 7/9] reorganize tests --- packages/integration-tests/local.js | 11 +- tests/discover_pdrs.js | 64 ----------- tests/fixtures/collections.json | 34 ++++++ tests/fixtures/collections/MOD09GQ.json | 32 ------ tests/fixtures/providers.json | 10 ++ .../providers/local_ftp_provider.json | 8 -- tests/fixtures/workflows/DiscoverPdrs.json | 19 ---- tests/fixtures/workflows/IngestGranule.json | 67 ----------- tests/fixtures/workflows/ParsePdr.json | 12 -- .../workflows/ftp_pdr_parse_ingest.json | 55 +++++++++ tests/ftp_pdr_parse_ingest.js | 104 ++++++++++++++++++ 11 files changed, 208 insertions(+), 208 deletions(-) delete mode 100644 tests/discover_pdrs.js create mode 100644 tests/fixtures/collections.json delete mode 100644 tests/fixtures/collections/MOD09GQ.json create mode 100644 tests/fixtures/providers.json delete mode 100644 tests/fixtures/providers/local_ftp_provider.json delete mode 100644 tests/fixtures/workflows/DiscoverPdrs.json delete mode 100644 tests/fixtures/workflows/IngestGranule.json delete mode 100644 tests/fixtures/workflows/ParsePdr.json create mode 100644 tests/fixtures/workflows/ftp_pdr_parse_ingest.json create mode 100644 tests/ftp_pdr_parse_ingest.js diff --git a/packages/integration-tests/local.js b/packages/integration-tests/local.js index 5e0dec35051..84aa9c7e7f5 100644 --- a/packages/integration-tests/local.js +++ b/packages/integration-tests/local.js @@ -45,30 +45,26 @@ function deleteCMAFromTasks(workflow, cmaFolder) { * Build a cumulus message for a given workflow * * @param {Object} workflow - a test workflow object - * @param {Object} collection - the cumulus collection object - * @param {Object} provider - the cumulus provider object * @param {Object} configOverride - a cumulus config override object * @param {Array} cfOutputs - mocked outputs of a CloudFormation template * @returns {Object} the generated cumulus message */ -function messageBuilder(workflow, collection, provider, configOverride, cfOutputs) { +function messageBuilder(workflow, configOverride, cfOutputs) { const workflowConfigs = {} workflow.steps.forEach((step) => { workflowConfigs[step.name] = step.cumulusConfig; }); const config = { - stackName: 'somestack', stack: 'somestack', workflowConfigs: { [workflow.name]: workflowConfigs } }; Object.assign(config, configOverride); + config.stackName = config.stack; const message = template(workflow.name, { States: workflowConfigs }, config, cfOutputs); - message.meta.provider = provider; - message.meta.collection = collection; message.cumulus_meta.message_source = 'local'; return message; } @@ -102,9 +98,12 @@ async function runStep(lambdaPath, lambdaHandler, message, stepName) { const moduleFunctionName = moduleFn[1]; const task = require(`${taskFullPath}/${moduleFileName}`); + console.log(`Started execution of ${stepName}`); + return new Promise((resolve, reject) => { task[moduleFunctionName](message, {}, (e, r) => { if (e) return reject(e); + console.log(`Completed execution of ${stepName}`); return resolve(r); }); }); diff --git a/tests/discover_pdrs.js b/tests/discover_pdrs.js deleted file mode 100644 index 74eefd327f7..00000000000 --- a/tests/discover_pdrs.js +++ /dev/null @@ -1,64 +0,0 @@ -'use strict'; - -const path = require('path'); -const test = require('ava'); -const fs = require('fs-extra'); -const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter'); -const { - runWorkflow, - copyCMAToTasks, - deleteCMAFromTasks -} = require('../packages/integration-tests/local'); -const { randomString } = require('../packages/common/test-utils'); -const { recursivelyDeleteS3Bucket, s3 } = require('../packages/common/aws'); -const DiscoverPdrsWorkflow = require('./fixtures/workflows/DiscoverPdrs.json'); -const IngestGranuleWorkflow = require('./fixtures/workflows/IngestGranule.json'); -const localFTPProvider = require('./fixtures/providers/local_ftp_provider.json'); -const mod09gq = require('./fixtures/collections/MOD09GQ.json'); - -// unfortunately t.context is not available in test.before -// this is fixed in ava 1.0.0 but it has a lot of breaking -// changes. The global variables below help with passing messages -// around between before and after hooks. -let internal; -let src; -let dest; -const cmaFolder = 'cumulus-message-adapter'; - - -test.before(async(t) => { - internal = randomString(); - await s3().createBucket({ Bucket: internal }).promise(); - - // download and unzip the message adapter - const gitPath = 'cumulus-nasa/cumulus-message-adapter'; - const filename = 'cumulus-message-adapter.zip'; - src = path.join(process.cwd(), 'tests', 'adapter.zip'); - dest = path.join(process.cwd(), 'tests', cmaFolder); - await fetchMessageAdapter(null, gitPath, filename, src, dest); -}); - -test('DiscoverPdr Workflow with FTP provider', async (t) => { - - try { - // copy cumulus-message-adapter - await copyCMAToTasks(DiscoverPdrsWorkflow, dest, cmaFolder); - - const msg = await runWorkflow(DiscoverPdrsWorkflow, mod09gq, localFTPProvider, internal); - - // discover-pdr must return a list of PDRs - const pdrs = msg.output.payload.pdrs; - t.true(Array.isArray(pdrs)); - t.is(pdrs.length, 4); - } - finally { - // remove cumulus-message-adapter from tasks - await deleteCMAFromTasks(DiscoverPdrsWorkflow, cmaFolder); - } -}); - -test.after.always('final cleanup', async(t) => { - await recursivelyDeleteS3Bucket(internal); - await fs.remove(src); - await fs.remove(dest); -}); diff --git a/tests/fixtures/collections.json b/tests/fixtures/collections.json new file mode 100644 index 00000000000..f296c32d486 --- /dev/null +++ b/tests/fixtures/collections.json @@ -0,0 +1,34 @@ +{ + "MOD09GQ": { + "name": "MOD09GQ", + "version": "006", + "dataType": "MOD09GQ", + "process": "modis", + "provider_path": "/pdrs", + "granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf", + "granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf", + "files": [ + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", + "bucket": "private", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", + "bucket": "public", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg" + } + ] + } +} \ No newline at end of file diff --git a/tests/fixtures/collections/MOD09GQ.json b/tests/fixtures/collections/MOD09GQ.json deleted file mode 100644 index 09319aa5299..00000000000 --- a/tests/fixtures/collections/MOD09GQ.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "name": "MOD09GQ", - "version": "006", - "dataType": "MOD09GQ", - "process": "modis", - "provider_path": "/pdrs", - "granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf", - "granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf", - "files": [ - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", - "bucket": "protected", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", - "bucket": "private", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$", - "bucket": "protected", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", - "bucket": "public", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg" - } - ] -} \ No newline at end of file diff --git a/tests/fixtures/providers.json b/tests/fixtures/providers.json new file mode 100644 index 00000000000..bc48519df37 --- /dev/null +++ b/tests/fixtures/providers.json @@ -0,0 +1,10 @@ +{ + "ftp": { + "id": "MODAPS", + "protocol": "sftp", + "host": "localhost", + "port": 2222, + "username": "user", + "password": "password" + } +} \ No newline at end of file diff --git a/tests/fixtures/providers/local_ftp_provider.json b/tests/fixtures/providers/local_ftp_provider.json deleted file mode 100644 index 723cc05c21a..00000000000 --- a/tests/fixtures/providers/local_ftp_provider.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "id": "MODAPS", - "protocol": "sftp", - "host": "localhost", - "port": 2222, - "username": "user", - "password": "password" -} \ No newline at end of file diff --git a/tests/fixtures/workflows/DiscoverPdrs.json b/tests/fixtures/workflows/DiscoverPdrs.json deleted file mode 100644 index 03084b88cee..00000000000 --- a/tests/fixtures/workflows/DiscoverPdrs.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "name": "DiscoverPdrs", - "steps": [ - { - "name": "DiscoverPdrs", - "lambda": "cumulus/tasks/discover-pdrs", - "handler": "index.handler", - "cumulusConfig": { - "templateUri": "{{$.meta.templates.ParsePdr}}", - "useQueue": true, - "queueUrl": "{{$.meta.queues.startSF}}", - "stack": "{{$.meta.stack}}", - "provider": "{{$.meta.provider}}", - "bucket": "{{$.meta.buckets.internal}}", - "collection": "{{$.meta.collection}}" - } - } - ] -} \ No newline at end of file diff --git a/tests/fixtures/workflows/IngestGranule.json b/tests/fixtures/workflows/IngestGranule.json deleted file mode 100644 index c0251340245..00000000000 --- a/tests/fixtures/workflows/IngestGranule.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "SyncGranule": { - "buckets": "{{$.meta.buckets}}", - "provider": "{{$.meta.provider}}", - "collection": "{{$.meta.collection}}", - "cumulus_message": { - "outputs": [ - { - "source": "{{$.granules}}", - "destination": "{{$.meta.input_granules}}" - }, - { - "source": "{{$}}", - "destination": "{{$.payload}}" - } - ] - } - }, - "ChooseProcess": {}, - "AsterProcess": { - "buckets": "{{$.meta.buckets}}", - "distribution_endpoint": "{{$.meta.distribution_endpoint}}", - "granuleIdExtraction": "{{$.meta.collection.granuleIdExtraction}}", - "files_config": "{{$.meta.collection.files}}", - "url_path": "{{$.meta.collection.url_path}}", - "cumulus_message": { - "input": "{[$.payload.granules[*].files[*].filename]}", - "outputs": [ - { - "source": "{{$}}", - "destination": "{{$.meta.all_the_files}}" - }, - { - "source": "{{$}}", - "destination": "{{$.payload}}" - } - ] - } - }, - "ModisProcess": { - "buckets": "{{$.meta.buckets}}", - "distribution_endpoint": "{{$.meta.distribution_endpoint}}", - "granuleIdExtraction": "{{$.meta.collection.granuleIdExtraction}}", - "files_config": "{{$.meta.collection.files}}", - "url_path": "{{$.meta.collection.url_path}}", - "cumulus_message": { - "input": "{[$.payload.granules[*].files[*].filename]}", - "outputs": [ - { - "source": "{{$}}", - "destination": "{{$.meta.all_the_files}}" - }, - { - "source": "{{$}}", - "destination": "{{$.payload}}" - } - ] - } - }, - "CmrStep": { - "bucket": "{{$.meta.buckets.internal}}", - "stack": "{{$.meta.stack}}", - "granuleIdExtraction": "{{$.meta.collection.granuleIdExtraction}}", - "cmr": "{{$.meta.cmr}}", - "input_granules": "{{$.meta.input_granules}}" - } -} \ No newline at end of file diff --git a/tests/fixtures/workflows/ParsePdr.json b/tests/fixtures/workflows/ParsePdr.json deleted file mode 100644 index 3a9a08d3004..00000000000 --- a/tests/fixtures/workflows/ParsePdr.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "ParsePdr": { - "useQueue": true, - "provider": "{{$.meta.provider}}", - "collection": "{{$.meta.collection}}", - "bucket": "{{$.meta.buckets.internal}}", - "stack": "{{$.meta.stack}}", - "templateUri": "{{$.meta.templates.IngestGranule}}", - "queueUrl": "{{$.meta.queues.startSF}}" - }, - "CheckStatus": {} -} \ No newline at end of file diff --git a/tests/fixtures/workflows/ftp_pdr_parse_ingest.json b/tests/fixtures/workflows/ftp_pdr_parse_ingest.json new file mode 100644 index 00000000000..81dfc45e02b --- /dev/null +++ b/tests/fixtures/workflows/ftp_pdr_parse_ingest.json @@ -0,0 +1,55 @@ +{ + "DiscoverPdrs": { + "collection": "MOD09GQ", + "provider": "ftp", + "name": "DiscoverPdrs", + "steps": [ + { + "name": "DiscoverPdrs", + "lambda": "cumulus/tasks/discover-pdrs", + "handler": "index.handler", + "cumulusConfig": { + "templateUri": "{{$.meta.templates.ParsePdr}}", + "useQueue": true, + "queueUrl": "{{$.meta.queues.startSF}}", + "stack": "{{$.meta.stack}}", + "provider": "{{$.meta.provider}}", + "bucket": "{{$.meta.buckets.internal}}", + "collection": "{{$.meta.collection}}" + } + }, + { + "name": "QueuePdrs", + "lambda": "cumulus/tasks/queue-pdrs", + "handler": "index.handler", + "cumulusConfig": { + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "queueUrl": "{{$.meta.queues.startSF}}", + "parsePdrMessageTemplateUri": "{{$.meta.templates.ParsePdr}}" + } + } + ] + }, + "ParsePdr": { + "collection": "MOD09GQ", + "provider": "ftp", + "name": "ParsePdr", + "steps": [ + { + "name": "ParsePdr", + "lambda": "cumulus/tasks/parse-pdr", + "handler": "index.handler", + "cumulusConfig": { + "useQueue": true, + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "bucket": "{{$.meta.buckets.internal}}", + "stack": "{{$.meta.stack}}", + "templateUri": "{{$.meta.templates.IngestGranule}}", + "queueUrl": "{{$.meta.queues.startSF}}" + } + } + ] + } +} \ No newline at end of file diff --git a/tests/ftp_pdr_parse_ingest.js b/tests/ftp_pdr_parse_ingest.js new file mode 100644 index 00000000000..f4b6015f5db --- /dev/null +++ b/tests/ftp_pdr_parse_ingest.js @@ -0,0 +1,104 @@ +'use strict'; + +const path = require('path'); +const test = require('ava'); +const fs = require('fs-extra'); +const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter'); +const { + runWorkflow, + copyCMAToTasks, + deleteCMAFromTasks, + messageBuilder +} = require('../packages/integration-tests/local'); +const { randomString, createQueue } = require('../packages/common/test-utils'); +const { recursivelyDeleteS3Bucket, s3, sqs } = require('../packages/common/aws'); +const workflowSet = require('./fixtures/workflows/ftp_pdr_parse_ingest.json'); +const collections = require('./fixtures/collections.json'); +const providers = require('./fixtures/providers.json'); + +// unfortunately t.context is not available in test.before +// this is fixed in ava 1.0.0 but it has a lot of breaking +// changes. The global variables below help with passing messages +// around between before and after hooks. +const context = {}; +const cmaFolder = 'cumulus-message-adapter'; + + +test.before(async() => { + context.internal = randomString(); + context.stack = randomString(); + context.templates = {}; + await s3().createBucket({ Bucket: context.internal }).promise(); + + // download and unzip the message adapter + const gitPath = 'cumulus-nasa/cumulus-message-adapter'; + const filename = 'cumulus-message-adapter.zip'; + context.src = path.join(process.cwd(), 'tests', 'adapter.zip'); + context.dest = path.join(process.cwd(), 'tests', cmaFolder); + await fetchMessageAdapter(null, gitPath, filename, context.src, context.dest); + + // create the queue + context.queueUrl = await createQueue(); + + const config = { + buckets: { + internal: context.internal + }, + stack: context.stack, + stepFunctions: {}, + sqs: {} + }; + + const cfOutputs = [{ + OutputKey: 'startSFSQSOutput', + OutputValue: context.queueUrl + }]; + + // create workflow templates + Object.keys(workflowSet).forEach((w) => { + config.stepFunctions[w] = {}; + }); + + const promises = Object.keys(workflowSet).map((w) => { + context.templates[w] = messageBuilder(workflowSet[w], config, cfOutputs); + return s3().putObject({ + Bucket: context.internal, + Key: `${context.stack}/workflows/${w}.json`, + Body: JSON.stringify(context.templates[w]) + }).promise(); + }); + + // upload templates + await Promise.all(promises); +}); + +test.serial('DiscoverPdr Workflow with FTP provider', async (t) => { + const workflow = workflowSet.DiscoverPdrs; + const input = context.templates.DiscoverPdrs; + try { + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + input.meta.collection = collections[workflow.collection]; + input.meta.provider = providers[workflow.provider]; + const msg = await runWorkflow(workflow, input); + + // discover-pdr must return a list of PDRs + const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; + t.true(Array.isArray(pdrs)); + t.is(pdrs.length, 4); + + t.is(msg.output.payload.pdrs_queued, pdrs.length); + } + finally { + // remove cumulus-message-adapter from tasks + await deleteCMAFromTasks(workflow, cmaFolder); + } +}); + +test.after.always('final cleanup', async() => { + await recursivelyDeleteS3Bucket(context.internal); + await sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(); + await fs.remove(context.src); + await fs.remove(context.dest); +}); From a8f7c711b53b6a1c86ad5b46374998eda2e24727 Mon Sep 17 00:00:00 2001 From: Scisco Date: Fri, 9 Mar 2018 11:11:23 -0500 Subject: [PATCH 8/9] run the tests against ftp and sftp providers --- CHANGELOG.md | 2 + README.md | 4 + package.json | 2 +- tests/fixtures/providers.json | 14 ++ ...arse_ingest.json => pdr_parse_ingest.json} | 0 tests/ftp_pdr_parse_ingest.js | 63 ++++++--- tests/sftp_pdr_parse_ingest.js | 127 ++++++++++++++++++ 7 files changed, 191 insertions(+), 21 deletions(-) rename tests/fixtures/workflows/{ftp_pdr_parse_ingest.json => pdr_parse_ingest.json} (100%) create mode 100644 tests/sftp_pdr_parse_ingest.js diff --git a/CHANGELOG.md b/CHANGELOG.md index d5c5c32e453..4e4ea54c130 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - added tools to @cumulus/integration-tests for local integration testing +- added end to end testing for discovering and parsing of PDRs +- `yarn e2e` command is available for end to end testing ### Fixed - **CUMULUS-175: "Dashboard providers not in sync with AWS providers."** The root cause of this bug - DynamoDB operations not showing up in Elasticsearch - was shared by collections and rules. The fix was to update providers', collections' and rules; POST, PUT and DELETE endpoints to operate on DynamoDB and using DynamoDB streams to update Elasticsearch. The following packages were made: diff --git a/README.md b/README.md index fefabe9d533..8d5afe9d4f7 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,10 @@ Run the test commands next $ yarn test +Run end to end tests by + + $ yarn e2e + ## Adding New Packages Create a new folder under `packages` if it is a common library or create folder under `cumulus/tasks` if it is a lambda task. `cd` to the folder and run `npm init`. diff --git a/package.json b/package.json index f7af9972599..43ea36231c9 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "version": "1.0.0", "description": "Cumulus Framework for ingesting and processing Nasa Earth data streams", "scripts": { - "e2e": "env TEST=true ava tests/*.js", + "e2e": "env TEST=true ava tests/*.js --serial", "test": "lerna run test", "bootstrap": "lerna bootstrap", "ybootstrap": "lerna bootstrap --npm-client=yarn", diff --git a/tests/fixtures/providers.json b/tests/fixtures/providers.json index bc48519df37..9ddab63bbe8 100644 --- a/tests/fixtures/providers.json +++ b/tests/fixtures/providers.json @@ -1,5 +1,19 @@ { "ftp": { + "id": "ftp", + "protocol": "ftp", + "host": "localhost", + "username": "testuser", + "password": "testpass" + }, + "http": { + "id": "http", + "protocol": "http", + "host": "http://localhost:3030", + "username": "fake", + "password": "fake" + }, + "sftp": { "id": "MODAPS", "protocol": "sftp", "host": "localhost", diff --git a/tests/fixtures/workflows/ftp_pdr_parse_ingest.json b/tests/fixtures/workflows/pdr_parse_ingest.json similarity index 100% rename from tests/fixtures/workflows/ftp_pdr_parse_ingest.json rename to tests/fixtures/workflows/pdr_parse_ingest.json diff --git a/tests/ftp_pdr_parse_ingest.js b/tests/ftp_pdr_parse_ingest.js index f4b6015f5db..3cc335fd729 100644 --- a/tests/ftp_pdr_parse_ingest.js +++ b/tests/ftp_pdr_parse_ingest.js @@ -11,8 +11,13 @@ const { messageBuilder } = require('../packages/integration-tests/local'); const { randomString, createQueue } = require('../packages/common/test-utils'); -const { recursivelyDeleteS3Bucket, s3, sqs } = require('../packages/common/aws'); -const workflowSet = require('./fixtures/workflows/ftp_pdr_parse_ingest.json'); +const { + recursivelyDeleteS3Bucket, + s3, + sqs, + receiveSQSMessages +} = require('../packages/common/aws'); +const workflowSet = require('./fixtures/workflows/pdr_parse_ingest.json'); const collections = require('./fixtures/collections.json'); const providers = require('./fixtures/providers.json'); @@ -33,8 +38,8 @@ test.before(async() => { // download and unzip the message adapter const gitPath = 'cumulus-nasa/cumulus-message-adapter'; const filename = 'cumulus-message-adapter.zip'; - context.src = path.join(process.cwd(), 'tests', 'adapter.zip'); - context.dest = path.join(process.cwd(), 'tests', cmaFolder); + context.src = path.join(process.cwd(), 'tests', `${randomString()}.zip`); + context.dest = path.join(process.cwd(), 'tests', randomString()); await fetchMessageAdapter(null, gitPath, filename, context.src, context.dest); // create the queue @@ -72,30 +77,48 @@ test.before(async() => { await Promise.all(promises); }); -test.serial('DiscoverPdr Workflow with FTP provider', async (t) => { +test.serial('Discover and queue PDRs with FTP provider', async (t) => { const workflow = workflowSet.DiscoverPdrs; + t.context.workflow = workflow; const input = context.templates.DiscoverPdrs; - try { - // copy cumulus-message-adapter - await copyCMAToTasks(workflow, context.dest, cmaFolder); + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); - input.meta.collection = collections[workflow.collection]; - input.meta.provider = providers[workflow.provider]; - const msg = await runWorkflow(workflow, input); + input.meta.collection = collections[workflow.collection]; + input.meta.provider = providers.ftp; + const msg = await runWorkflow(workflow, input); - // discover-pdr must return a list of PDRs - const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; - t.true(Array.isArray(pdrs)); - t.is(pdrs.length, 4); + // discover-pdr must return a list of PDRs + const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; + t.true(Array.isArray(pdrs)); + t.is(pdrs.length, 4); - t.is(msg.output.payload.pdrs_queued, pdrs.length); - } - finally { - // remove cumulus-message-adapter from tasks - await deleteCMAFromTasks(workflow, cmaFolder); + t.is(msg.output.payload.pdrs_queued, pdrs.length); +}); + +test.serial('Parse Pdrs from the previous step', async (t) => { + const workflow = workflowSet.ParsePdr; + t.context.workflow = workflow; + + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + const messages = await receiveSQSMessages(context.queueUrl, 4); + + for (const input of messages) { + const msg = await runWorkflow(workflow, input.Body); + t.truthy(msg.input.payload.pdr); + t.is( + msg.output.payload.granules.length, + msg.output.payload.granulesCount + ); } }); +test.afterEach.always(async(t) => { + await deleteCMAFromTasks(t.context.workflow, cmaFolder); +}); + test.after.always('final cleanup', async() => { await recursivelyDeleteS3Bucket(context.internal); await sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(); diff --git a/tests/sftp_pdr_parse_ingest.js b/tests/sftp_pdr_parse_ingest.js new file mode 100644 index 00000000000..06bace870ba --- /dev/null +++ b/tests/sftp_pdr_parse_ingest.js @@ -0,0 +1,127 @@ +'use strict'; + +const path = require('path'); +const test = require('ava'); +const fs = require('fs-extra'); +const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter'); +const { + runWorkflow, + copyCMAToTasks, + deleteCMAFromTasks, + messageBuilder +} = require('../packages/integration-tests/local'); +const { randomString, createQueue } = require('../packages/common/test-utils'); +const { + recursivelyDeleteS3Bucket, + s3, + sqs, + receiveSQSMessages +} = require('../packages/common/aws'); +const workflowSet = require('./fixtures/workflows/pdr_parse_ingest.json'); +const collections = require('./fixtures/collections.json'); +const providers = require('./fixtures/providers.json'); + +// unfortunately t.context is not available in test.before +// this is fixed in ava 1.0.0 but it has a lot of breaking +// changes. The global variables below help with passing messages +// around between before and after hooks. +const context = {}; +const cmaFolder = 'cumulus-message-adapter'; + + +test.before(async() => { + context.internal = randomString(); + context.stack = randomString(); + context.templates = {}; + await s3().createBucket({ Bucket: context.internal }).promise(); + + // download and unzip the message adapter + const gitPath = 'cumulus-nasa/cumulus-message-adapter'; + const filename = 'cumulus-message-adapter.zip'; + context.src = path.join(process.cwd(), 'tests', `${randomString()}.zip`); + context.dest = path.join(process.cwd(), 'tests', randomString()); + await fetchMessageAdapter(null, gitPath, filename, context.src, context.dest); + + // create the queue + context.queueUrl = await createQueue(); + + const config = { + buckets: { + internal: context.internal + }, + stack: context.stack, + stepFunctions: {}, + sqs: {} + }; + + const cfOutputs = [{ + OutputKey: 'startSFSQSOutput', + OutputValue: context.queueUrl + }]; + + // create workflow templates + Object.keys(workflowSet).forEach((w) => { + config.stepFunctions[w] = {}; + }); + + const promises = Object.keys(workflowSet).map((w) => { + context.templates[w] = messageBuilder(workflowSet[w], config, cfOutputs); + return s3().putObject({ + Bucket: context.internal, + Key: `${context.stack}/workflows/${w}.json`, + Body: JSON.stringify(context.templates[w]) + }).promise(); + }); + + // upload templates + await Promise.all(promises); +}); + +test.serial('Discover and queue PDRs with FTP provider', async (t) => { + const workflow = workflowSet.DiscoverPdrs; + t.context.workflow = workflow; + const input = context.templates.DiscoverPdrs; + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + input.meta.collection = collections[workflow.collection]; + input.meta.provider = providers.sftp; + const msg = await runWorkflow(workflow, input); + + // discover-pdr must return a list of PDRs + const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; + t.true(Array.isArray(pdrs)); + t.is(pdrs.length, 4); + + t.is(msg.output.payload.pdrs_queued, pdrs.length); +}); + +test.serial('Parse Pdrs from the previous step', async (t) => { + const workflow = workflowSet.ParsePdr; + t.context.workflow = workflow; + + // copy cumulus-message-adapter + await copyCMAToTasks(workflow, context.dest, cmaFolder); + + const messages = await receiveSQSMessages(context.queueUrl, 4); + + for (const input of messages) { + const msg = await runWorkflow(workflow, input.Body); + t.truthy(msg.input.payload.pdr); + t.is( + msg.output.payload.granules.length, + msg.output.payload.granulesCount + ); + } +}); + +test.afterEach.always(async(t) => { + await deleteCMAFromTasks(t.context.workflow, cmaFolder); +}); + +test.after.always('final cleanup', async() => { + await recursivelyDeleteS3Bucket(context.internal); + await sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(); + await fs.remove(context.src); + await fs.remove(context.dest); +}); From 29c7d8d245c0956ee080bcff6030fff0862145ca Mon Sep 17 00:00:00 2001 From: Scisco Date: Fri, 9 Mar 2018 17:00:02 -0500 Subject: [PATCH 9/9] address pr comments --- packages/common/aws.js | 2 +- packages/integration-tests/local.js | 35 ++++-- tests/fixtures/collections.json | 64 +++++------ tests/fixtures/providers.json | 44 ++++---- .../fixtures/workflows/pdr_parse_ingest.json | 102 +++++++++--------- tests/ftp_pdr_parse_ingest.js | 9 +- tests/sftp_pdr_parse_ingest.js | 10 +- 7 files changed, 141 insertions(+), 125 deletions(-) diff --git a/packages/common/aws.js b/packages/common/aws.js index faf268f0de1..c6fe7460e7c 100644 --- a/packages/common/aws.js +++ b/packages/common/aws.js @@ -537,7 +537,7 @@ exports.receiveSQSMessages = async (queueUrl, numOfMessages = 1, timeout = 30) = * @param {integer} receiptHandle - the unique identifier of the sQS message * @returns {Promise} an AWS SQS response */ -exports.deleteSQSMessage = async (queueUrl, receiptHandle) => { +exports.deleteSQSMessage = (queueUrl, receiptHandle) => { const params = { QueueUrl: queueUrl, ReceiptHandle: receiptHandle diff --git a/packages/integration-tests/local.js b/packages/integration-tests/local.js index 84aa9c7e7f5..e28bd51e207 100644 --- a/packages/integration-tests/local.js +++ b/packages/integration-tests/local.js @@ -7,7 +7,28 @@ const path = require('path'); const fs = require('fs-extra'); const clone = require('lodash.clonedeep'); +const { randomString } = require('@cumulus/common/test-utils'); const { template } = require('@cumulus/deployment/lib/message'); +const { fetchMessageAdapter } = require('@cumulus/deployment/lib/adapter'); + +/** + * Download cumulus message adapter (CMA) and unzip it + * + * @param {string} version - cumulus message adapter version number (optional) + * @returns {Promise.} an object with path to the zip and extracted CMA + */ +async function downloadCMA(version) { + // download and unzip the message adapter + const gitPath = 'cumulus-nasa/cumulus-message-adapter'; + const filename = 'cumulus-message-adapter.zip'; + const src = path.join(process.cwd(), 'tests', `${randomString()}.zip`); + const dest = path.join(process.cwd(), 'tests', randomString()); + await fetchMessageAdapter(version, gitPath, filename, src, dest); + return { + src, + dest + }; +} /** * Copy cumulus message adapter python folder to each task @@ -50,13 +71,13 @@ function deleteCMAFromTasks(workflow, cmaFolder) { * @returns {Object} the generated cumulus message */ function messageBuilder(workflow, configOverride, cfOutputs) { - const workflowConfigs = {} + const workflowConfigs = {}; workflow.steps.forEach((step) => { workflowConfigs[step.name] = step.cumulusConfig; }); const config = { - stack: 'somestack', + stack: 'somestack', workflowConfigs: { [workflow.name]: workflowConfigs } @@ -73,7 +94,7 @@ function messageBuilder(workflow, configOverride, cfOutputs) { * Runs a given workflow step (task) * * @param {string} lambdaPath - the local path to the task (e.g. path/to/task) - * @param {string} lambdaHandler - the lambda handler (e.g. index.hanlder) + * @param {string} lambdaHandler - the lambda handler (e.g. index.hanlder) * @param {Object} message - the cumulus message input for the task * @param {string} stepName - name of the step/task * @returns {Promise.} the cumulus message returned by the task @@ -82,7 +103,6 @@ async function runStep(lambdaPath, lambdaHandler, message, stepName) { const taskFullPath = path.join(process.cwd(), lambdaPath); const src = path.join(taskFullPath, 'adapter.zip'); const dest = path.join(taskFullPath, 'cumulus-message-adapter'); - let resp; process.env.CUMULUS_MESSAGE_ADAPTER_DIR = dest; @@ -90,13 +110,11 @@ async function runStep(lambdaPath, lambdaHandler, message, stepName) { message.cumulus_meta.task = stepName; try { - // add message adapter to task folder - // run the task const moduleFn = lambdaHandler.split('.'); const moduleFileName = moduleFn[0]; const moduleFunctionName = moduleFn[1]; - const task = require(`${taskFullPath}/${moduleFileName}`); + const task = require(`${taskFullPath}/${moduleFileName}`); // eslint-disable-line global-require console.log(`Started execution of ${stepName}`); @@ -132,7 +150,7 @@ async function runWorkflow(workflow, message) { let stepInput = clone(message); for (const step of workflow.steps) { - stepInput = await runStep(step.lambda, step.handler, stepInput, step.name) + stepInput = await runStep(step.lambda, step.handler, stepInput, step.name); trail.stepOutputs[step.name] = clone(stepInput); } trail.output = clone(stepInput); @@ -141,6 +159,7 @@ async function runWorkflow(workflow, message) { } module.exports = { + downloadCMA, copyCMAToTasks, deleteCMAFromTasks, runStep, diff --git a/tests/fixtures/collections.json b/tests/fixtures/collections.json index f296c32d486..efc59890b14 100644 --- a/tests/fixtures/collections.json +++ b/tests/fixtures/collections.json @@ -1,34 +1,34 @@ { - "MOD09GQ": { - "name": "MOD09GQ", - "version": "006", - "dataType": "MOD09GQ", - "process": "modis", - "provider_path": "/pdrs", - "granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf", - "granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf", - "files": [ - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", - "bucket": "protected", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", - "bucket": "private", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$", - "bucket": "protected", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", - "bucket": "public", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg" - } - ] - } + "MOD09GQ": { + "name": "MOD09GQ", + "version": "006", + "dataType": "MOD09GQ", + "process": "modis", + "provider_path": "/pdrs", + "granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf", + "granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf", + "files": [ + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", + "bucket": "private", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$", + "bucket": "protected", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml" + }, + { + "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", + "bucket": "public", + "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg" + } + ] + } } \ No newline at end of file diff --git a/tests/fixtures/providers.json b/tests/fixtures/providers.json index 9ddab63bbe8..3ea4d95904a 100644 --- a/tests/fixtures/providers.json +++ b/tests/fixtures/providers.json @@ -1,24 +1,24 @@ { - "ftp": { - "id": "ftp", - "protocol": "ftp", - "host": "localhost", - "username": "testuser", - "password": "testpass" - }, - "http": { - "id": "http", - "protocol": "http", - "host": "http://localhost:3030", - "username": "fake", - "password": "fake" - }, - "sftp": { - "id": "MODAPS", - "protocol": "sftp", - "host": "localhost", - "port": 2222, - "username": "user", - "password": "password" - } + "ftp": { + "id": "ftp", + "protocol": "ftp", + "host": "localhost", + "username": "testuser", + "password": "testpass" + }, + "http": { + "id": "http", + "protocol": "http", + "host": "http://localhost:3030", + "username": "fake", + "password": "fake" + }, + "sftp": { + "id": "MODAPS", + "protocol": "sftp", + "host": "localhost", + "port": 2222, + "username": "user", + "password": "password" + } } \ No newline at end of file diff --git a/tests/fixtures/workflows/pdr_parse_ingest.json b/tests/fixtures/workflows/pdr_parse_ingest.json index 81dfc45e02b..14dc893c40a 100644 --- a/tests/fixtures/workflows/pdr_parse_ingest.json +++ b/tests/fixtures/workflows/pdr_parse_ingest.json @@ -1,55 +1,55 @@ { - "DiscoverPdrs": { - "collection": "MOD09GQ", - "provider": "ftp", + "DiscoverPdrs": { + "collection": "MOD09GQ", + "provider": "ftp", + "name": "DiscoverPdrs", + "steps": [ + { "name": "DiscoverPdrs", - "steps": [ - { - "name": "DiscoverPdrs", - "lambda": "cumulus/tasks/discover-pdrs", - "handler": "index.handler", - "cumulusConfig": { - "templateUri": "{{$.meta.templates.ParsePdr}}", - "useQueue": true, - "queueUrl": "{{$.meta.queues.startSF}}", - "stack": "{{$.meta.stack}}", - "provider": "{{$.meta.provider}}", - "bucket": "{{$.meta.buckets.internal}}", - "collection": "{{$.meta.collection}}" - } - }, - { - "name": "QueuePdrs", - "lambda": "cumulus/tasks/queue-pdrs", - "handler": "index.handler", - "cumulusConfig": { - "provider": "{{$.meta.provider}}", - "collection": "{{$.meta.collection}}", - "queueUrl": "{{$.meta.queues.startSF}}", - "parsePdrMessageTemplateUri": "{{$.meta.templates.ParsePdr}}" - } - } - ] - }, - "ParsePdr": { - "collection": "MOD09GQ", - "provider": "ftp", + "lambda": "cumulus/tasks/discover-pdrs", + "handler": "index.handler", + "cumulusConfig": { + "templateUri": "{{$.meta.templates.ParsePdr}}", + "useQueue": true, + "queueUrl": "{{$.meta.queues.startSF}}", + "stack": "{{$.meta.stack}}", + "provider": "{{$.meta.provider}}", + "bucket": "{{$.meta.buckets.internal}}", + "collection": "{{$.meta.collection}}" + } + }, + { + "name": "QueuePdrs", + "lambda": "cumulus/tasks/queue-pdrs", + "handler": "index.handler", + "cumulusConfig": { + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "queueUrl": "{{$.meta.queues.startSF}}", + "parsePdrMessageTemplateUri": "{{$.meta.templates.ParsePdr}}" + } + } + ] + }, + "ParsePdr": { + "collection": "MOD09GQ", + "provider": "ftp", + "name": "ParsePdr", + "steps": [ + { "name": "ParsePdr", - "steps": [ - { - "name": "ParsePdr", - "lambda": "cumulus/tasks/parse-pdr", - "handler": "index.handler", - "cumulusConfig": { - "useQueue": true, - "provider": "{{$.meta.provider}}", - "collection": "{{$.meta.collection}}", - "bucket": "{{$.meta.buckets.internal}}", - "stack": "{{$.meta.stack}}", - "templateUri": "{{$.meta.templates.IngestGranule}}", - "queueUrl": "{{$.meta.queues.startSF}}" - } - } - ] - } + "lambda": "cumulus/tasks/parse-pdr", + "handler": "index.handler", + "cumulusConfig": { + "useQueue": true, + "provider": "{{$.meta.provider}}", + "collection": "{{$.meta.collection}}", + "bucket": "{{$.meta.buckets.internal}}", + "stack": "{{$.meta.stack}}", + "templateUri": "{{$.meta.templates.IngestGranule}}", + "queueUrl": "{{$.meta.queues.startSF}}" + } + } + ] + } } \ No newline at end of file diff --git a/tests/ftp_pdr_parse_ingest.js b/tests/ftp_pdr_parse_ingest.js index 3cc335fd729..63397eeecee 100644 --- a/tests/ftp_pdr_parse_ingest.js +++ b/tests/ftp_pdr_parse_ingest.js @@ -6,6 +6,7 @@ const fs = require('fs-extra'); const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter'); const { runWorkflow, + downloadCMA, copyCMAToTasks, deleteCMAFromTasks, messageBuilder @@ -36,11 +37,9 @@ test.before(async() => { await s3().createBucket({ Bucket: context.internal }).promise(); // download and unzip the message adapter - const gitPath = 'cumulus-nasa/cumulus-message-adapter'; - const filename = 'cumulus-message-adapter.zip'; - context.src = path.join(process.cwd(), 'tests', `${randomString()}.zip`); - context.dest = path.join(process.cwd(), 'tests', randomString()); - await fetchMessageAdapter(null, gitPath, filename, context.src, context.dest); + const { src, dest } = await downloadCMA(); + context.src = src; + context.dest = dest; // create the queue context.queueUrl = await createQueue(); diff --git a/tests/sftp_pdr_parse_ingest.js b/tests/sftp_pdr_parse_ingest.js index 06bace870ba..8cc4c3b1fb4 100644 --- a/tests/sftp_pdr_parse_ingest.js +++ b/tests/sftp_pdr_parse_ingest.js @@ -3,9 +3,9 @@ const path = require('path'); const test = require('ava'); const fs = require('fs-extra'); -const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter'); const { runWorkflow, + downloadCMA, copyCMAToTasks, deleteCMAFromTasks, messageBuilder @@ -36,11 +36,9 @@ test.before(async() => { await s3().createBucket({ Bucket: context.internal }).promise(); // download and unzip the message adapter - const gitPath = 'cumulus-nasa/cumulus-message-adapter'; - const filename = 'cumulus-message-adapter.zip'; - context.src = path.join(process.cwd(), 'tests', `${randomString()}.zip`); - context.dest = path.join(process.cwd(), 'tests', randomString()); - await fetchMessageAdapter(null, gitPath, filename, context.src, context.dest); + const { src, dest } = await downloadCMA(); + context.src = src; + context.dest = dest; // create the queue context.queueUrl = await createQueue();