Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Scisco committed Mar 9, 2018
1 parent a8f7c71 commit 29c7d8d
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 125 deletions.
2 changes: 1 addition & 1 deletion packages/common/aws.js
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ exports.receiveSQSMessages = async (queueUrl, numOfMessages = 1, timeout = 30) =
* @param {integer} receiptHandle - the unique identifier of the sQS message
* @returns {Promise} an AWS SQS response
*/
exports.deleteSQSMessage = async (queueUrl, receiptHandle) => {
exports.deleteSQSMessage = (queueUrl, receiptHandle) => {
const params = {
QueueUrl: queueUrl,
ReceiptHandle: receiptHandle
Expand Down
35 changes: 27 additions & 8 deletions packages/integration-tests/local.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,28 @@
const path = require('path');
const fs = require('fs-extra');
const clone = require('lodash.clonedeep');
const { randomString } = require('@cumulus/common/test-utils');
const { template } = require('@cumulus/deployment/lib/message');
const { fetchMessageAdapter } = require('@cumulus/deployment/lib/adapter');

/**
* Download cumulus message adapter (CMA) and unzip it
*
* @param {string} version - cumulus message adapter version number (optional)
* @returns {Promise.<Object>} an object with path to the zip and extracted CMA
*/
async function downloadCMA(version) {
// download and unzip the message adapter
const gitPath = 'cumulus-nasa/cumulus-message-adapter';
const filename = 'cumulus-message-adapter.zip';
const src = path.join(process.cwd(), 'tests', `${randomString()}.zip`);
const dest = path.join(process.cwd(), 'tests', randomString());
await fetchMessageAdapter(version, gitPath, filename, src, dest);
return {
src,
dest
};
}

/**
* Copy cumulus message adapter python folder to each task
Expand Down Expand Up @@ -50,13 +71,13 @@ function deleteCMAFromTasks(workflow, cmaFolder) {
* @returns {Object} the generated cumulus message
*/
function messageBuilder(workflow, configOverride, cfOutputs) {
const workflowConfigs = {}
const workflowConfigs = {};
workflow.steps.forEach((step) => {
workflowConfigs[step.name] = step.cumulusConfig;
});

const config = {
stack: 'somestack',
stack: 'somestack',
workflowConfigs: {
[workflow.name]: workflowConfigs
}
Expand All @@ -73,7 +94,7 @@ function messageBuilder(workflow, configOverride, cfOutputs) {
* Runs a given workflow step (task)
*
* @param {string} lambdaPath - the local path to the task (e.g. path/to/task)
* @param {string} lambdaHandler - the lambda handler (e.g. index.hanlder)
* @param {string} lambdaHandler - the lambda handler (e.g. index.hanlder)
* @param {Object} message - the cumulus message input for the task
* @param {string} stepName - name of the step/task
* @returns {Promise.<Object>} the cumulus message returned by the task
Expand All @@ -82,21 +103,18 @@ async function runStep(lambdaPath, lambdaHandler, message, stepName) {
const taskFullPath = path.join(process.cwd(), lambdaPath);
const src = path.join(taskFullPath, 'adapter.zip');
const dest = path.join(taskFullPath, 'cumulus-message-adapter');
let resp;

process.env.CUMULUS_MESSAGE_ADAPTER_DIR = dest;

// add step name to the message
message.cumulus_meta.task = stepName;

try {
// add message adapter to task folder

// run the task
const moduleFn = lambdaHandler.split('.');
const moduleFileName = moduleFn[0];
const moduleFunctionName = moduleFn[1];
const task = require(`${taskFullPath}/${moduleFileName}`);
const task = require(`${taskFullPath}/${moduleFileName}`); // eslint-disable-line global-require

console.log(`Started execution of ${stepName}`);

Expand Down Expand Up @@ -132,7 +150,7 @@ async function runWorkflow(workflow, message) {
let stepInput = clone(message);

for (const step of workflow.steps) {
stepInput = await runStep(step.lambda, step.handler, stepInput, step.name)
stepInput = await runStep(step.lambda, step.handler, stepInput, step.name);
trail.stepOutputs[step.name] = clone(stepInput);
}
trail.output = clone(stepInput);
Expand All @@ -141,6 +159,7 @@ async function runWorkflow(workflow, message) {
}

module.exports = {
downloadCMA,
copyCMAToTasks,
deleteCMAFromTasks,
runStep,
Expand Down
64 changes: 32 additions & 32 deletions tests/fixtures/collections.json
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
{
"MOD09GQ": {
"name": "MOD09GQ",
"version": "006",
"dataType": "MOD09GQ",
"process": "modis",
"provider_path": "/pdrs",
"granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf",
"granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf",
"files": [
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$",
"bucket": "protected",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf"
},
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$",
"bucket": "private",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met"
},
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$",
"bucket": "protected",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml"
},
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$",
"bucket": "public",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg"
}
]
}
"MOD09GQ": {
"name": "MOD09GQ",
"version": "006",
"dataType": "MOD09GQ",
"process": "modis",
"provider_path": "/pdrs",
"granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf",
"granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf",
"files": [
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$",
"bucket": "protected",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf"
},
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$",
"bucket": "private",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met"
},
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$",
"bucket": "protected",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml"
},
{
"regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$",
"bucket": "public",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg"
}
]
}
}
44 changes: 22 additions & 22 deletions tests/fixtures/providers.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
{
"ftp": {
"id": "ftp",
"protocol": "ftp",
"host": "localhost",
"username": "testuser",
"password": "testpass"
},
"http": {
"id": "http",
"protocol": "http",
"host": "http://localhost:3030",
"username": "fake",
"password": "fake"
},
"sftp": {
"id": "MODAPS",
"protocol": "sftp",
"host": "localhost",
"port": 2222,
"username": "user",
"password": "password"
}
"ftp": {
"id": "ftp",
"protocol": "ftp",
"host": "localhost",
"username": "testuser",
"password": "testpass"
},
"http": {
"id": "http",
"protocol": "http",
"host": "http://localhost:3030",
"username": "fake",
"password": "fake"
},
"sftp": {
"id": "MODAPS",
"protocol": "sftp",
"host": "localhost",
"port": 2222,
"username": "user",
"password": "password"
}
}
102 changes: 51 additions & 51 deletions tests/fixtures/workflows/pdr_parse_ingest.json
Original file line number Diff line number Diff line change
@@ -1,55 +1,55 @@
{
"DiscoverPdrs": {
"collection": "MOD09GQ",
"provider": "ftp",
"DiscoverPdrs": {
"collection": "MOD09GQ",
"provider": "ftp",
"name": "DiscoverPdrs",
"steps": [
{
"name": "DiscoverPdrs",
"steps": [
{
"name": "DiscoverPdrs",
"lambda": "cumulus/tasks/discover-pdrs",
"handler": "index.handler",
"cumulusConfig": {
"templateUri": "{{$.meta.templates.ParsePdr}}",
"useQueue": true,
"queueUrl": "{{$.meta.queues.startSF}}",
"stack": "{{$.meta.stack}}",
"provider": "{{$.meta.provider}}",
"bucket": "{{$.meta.buckets.internal}}",
"collection": "{{$.meta.collection}}"
}
},
{
"name": "QueuePdrs",
"lambda": "cumulus/tasks/queue-pdrs",
"handler": "index.handler",
"cumulusConfig": {
"provider": "{{$.meta.provider}}",
"collection": "{{$.meta.collection}}",
"queueUrl": "{{$.meta.queues.startSF}}",
"parsePdrMessageTemplateUri": "{{$.meta.templates.ParsePdr}}"
}
}
]
},
"ParsePdr": {
"collection": "MOD09GQ",
"provider": "ftp",
"lambda": "cumulus/tasks/discover-pdrs",
"handler": "index.handler",
"cumulusConfig": {
"templateUri": "{{$.meta.templates.ParsePdr}}",
"useQueue": true,
"queueUrl": "{{$.meta.queues.startSF}}",
"stack": "{{$.meta.stack}}",
"provider": "{{$.meta.provider}}",
"bucket": "{{$.meta.buckets.internal}}",
"collection": "{{$.meta.collection}}"
}
},
{
"name": "QueuePdrs",
"lambda": "cumulus/tasks/queue-pdrs",
"handler": "index.handler",
"cumulusConfig": {
"provider": "{{$.meta.provider}}",
"collection": "{{$.meta.collection}}",
"queueUrl": "{{$.meta.queues.startSF}}",
"parsePdrMessageTemplateUri": "{{$.meta.templates.ParsePdr}}"
}
}
]
},
"ParsePdr": {
"collection": "MOD09GQ",
"provider": "ftp",
"name": "ParsePdr",
"steps": [
{
"name": "ParsePdr",
"steps": [
{
"name": "ParsePdr",
"lambda": "cumulus/tasks/parse-pdr",
"handler": "index.handler",
"cumulusConfig": {
"useQueue": true,
"provider": "{{$.meta.provider}}",
"collection": "{{$.meta.collection}}",
"bucket": "{{$.meta.buckets.internal}}",
"stack": "{{$.meta.stack}}",
"templateUri": "{{$.meta.templates.IngestGranule}}",
"queueUrl": "{{$.meta.queues.startSF}}"
}
}
]
}
"lambda": "cumulus/tasks/parse-pdr",
"handler": "index.handler",
"cumulusConfig": {
"useQueue": true,
"provider": "{{$.meta.provider}}",
"collection": "{{$.meta.collection}}",
"bucket": "{{$.meta.buckets.internal}}",
"stack": "{{$.meta.stack}}",
"templateUri": "{{$.meta.templates.IngestGranule}}",
"queueUrl": "{{$.meta.queues.startSF}}"
}
}
]
}
}
9 changes: 4 additions & 5 deletions tests/ftp_pdr_parse_ingest.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const fs = require('fs-extra');
const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter');
const {
runWorkflow,
downloadCMA,
copyCMAToTasks,
deleteCMAFromTasks,
messageBuilder
Expand Down Expand Up @@ -36,11 +37,9 @@ test.before(async() => {
await s3().createBucket({ Bucket: context.internal }).promise();

// download and unzip the message adapter
const gitPath = 'cumulus-nasa/cumulus-message-adapter';
const filename = 'cumulus-message-adapter.zip';
context.src = path.join(process.cwd(), 'tests', `${randomString()}.zip`);
context.dest = path.join(process.cwd(), 'tests', randomString());
await fetchMessageAdapter(null, gitPath, filename, context.src, context.dest);
const { src, dest } = await downloadCMA();
context.src = src;
context.dest = dest;

// create the queue
context.queueUrl = await createQueue();
Expand Down
10 changes: 4 additions & 6 deletions tests/sftp_pdr_parse_ingest.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
const path = require('path');
const test = require('ava');
const fs = require('fs-extra');
const { fetchMessageAdapter } = require('../packages/deployment/lib/adapter');
const {
runWorkflow,
downloadCMA,
copyCMAToTasks,
deleteCMAFromTasks,
messageBuilder
Expand Down Expand Up @@ -36,11 +36,9 @@ test.before(async() => {
await s3().createBucket({ Bucket: context.internal }).promise();

// download and unzip the message adapter
const gitPath = 'cumulus-nasa/cumulus-message-adapter';
const filename = 'cumulus-message-adapter.zip';
context.src = path.join(process.cwd(), 'tests', `${randomString()}.zip`);
context.dest = path.join(process.cwd(), 'tests', randomString());
await fetchMessageAdapter(null, gitPath, filename, context.src, context.dest);
const { src, dest } = await downloadCMA();
context.src = src;
context.dest = dest;

// create the queue
context.queueUrl = await createQueue();
Expand Down

0 comments on commit 29c7d8d

Please sign in to comment.