From da12779a4eae740398652632ad6dc182a14219c2 Mon Sep 17 00:00:00 2001 From: Marc Huffnagle Date: Thu, 1 Mar 2018 15:16:32 -0500 Subject: [PATCH 1/4] CUMULUS-350 Update queue-granules to use the message adapter Removes a lot of extra logic from the granule queueing function. --- cumulus/tasks/discover-granules/index.js | 2 +- cumulus/tasks/queue-granules/index.js | 74 +++--- cumulus/tasks/queue-granules/package.json | 4 +- .../tasks/queue-granules/schemas/config.json | 19 ++ .../queue-granules/schemas/config.json.txt | 73 ------ .../schemas/{input.json.txt => input.json} | 14 +- .../schemas/{output.json.txt => output.json} | 5 +- .../tasks/queue-granules/tests/.eslintrc.json | 5 + cumulus/tasks/queue-granules/tests/index.js | 228 +++++++++++++++--- packages/api/models/rules.js | 2 +- packages/common/aws.js | 80 +----- packages/ingest/queue.js | 87 +++---- packages/ingest/test/queue.js | 96 -------- 13 files changed, 301 insertions(+), 388 deletions(-) create mode 100644 cumulus/tasks/queue-granules/schemas/config.json delete mode 100644 cumulus/tasks/queue-granules/schemas/config.json.txt rename cumulus/tasks/queue-granules/schemas/{input.json.txt => input.json} (53%) rename cumulus/tasks/queue-granules/schemas/{output.json.txt => output.json} (65%) create mode 100644 cumulus/tasks/queue-granules/tests/.eslintrc.json delete mode 100644 packages/ingest/test/queue.js diff --git a/cumulus/tasks/discover-granules/index.js b/cumulus/tasks/discover-granules/index.js index 73e786c5b8b..b289fe73415 100644 --- a/cumulus/tasks/discover-granules/index.js +++ b/cumulus/tasks/discover-granules/index.js @@ -6,7 +6,7 @@ const log = require('@cumulus/common/log'); /** * Discover granules -* See schemas/input.json for detailed input schema +* See schemas/input.json and schemas/config.json for detailed event description * * @param {Object} event - Lambda event object * @returns {Promise} - see schemas/output.json for detailed output schema diff --git a/cumulus/tasks/queue-granules/index.js b/cumulus/tasks/queue-granules/index.js index 4f2768a8f60..2947571370e 100644 --- a/cumulus/tasks/queue-granules/index.js +++ b/cumulus/tasks/queue-granules/index.js @@ -1,52 +1,42 @@ 'use strict'; -const queueGranule = require('@cumulus/ingest/queue').queueGranule; -const log = require('@cumulus/common/log'); +const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); +const { enqueueGranuleIngestMessage } = require('@cumulus/ingest/queue'); /** -* Callback function provided by aws lambda. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-handler.html#nodejs-prog-model-handler-callback -* @callback lambdaCallback -* @param {object} error -* @param {object} output - output object matching schemas/output.json -* @param {integer} output.granules_queued -*/ - -/** -* For each Granule, generate a new SF messages send to the step function queue to be executed -* @param {object} event lambda event object -* @param {object} event.input -* @param {array} event.input.granules -* @param {object} context Lambda context object. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html -* @param {lambdaCallback} callback callback function -* @return {undefined} +* See schemas/input.json and schemas/config.json for detailed event description +* +* @param {Object} event - Lambda event object +* @returns {Promise} - see schemas/output.json for detailed output schema +* that is passed to the next task in the workflow **/ -function handler(event, context, cb) { - const config = event.config; - const stack = config.stack; - const bucket = config.bucket; - const queueUrl = config.queueUrl; - const templateUri = config.templateUri; - const provider = config.provider; - const collection = config.collection; +async function queueGranules(event) { const granules = event.input.granules || []; - const queuedGranules = granules.map(g => queueGranule( - g, - queueUrl, - templateUri, - provider, - collection, - null, - stack, - bucket - )); + await Promise.all( + granules.map((granule) => enqueueGranuleIngestMessage( + granule, + event.config.queueUrl, + event.config.granuleIngestMessageTemplateUri, + event.config.provider, + event.config.collection, + event.input.pdr + )) + ); - return Promise.all(queuedGranules).then(() => { - cb(null, { granules_queued: queuedGranules.length }); - }).catch((e) => { - log.error(e); - cb(e); - }); + return { granules_queued: granules.length }; } +exports.queueGranules = queueGranules; -module.exports.handler = handler; +/** + * Lambda handler + * + * @param {Object} event - a Cumulus Message + * @param {Object} context - an AWS Lambda context + * @param {Function} callback - an AWS Lambda handler + * @returns {undefined} - does not return a value + */ +function handler(event, context, callback) { + cumulusMessageAdapter.runCumulusTask(queueGranules, event, context, callback); +} +exports.handler = handler; diff --git a/cumulus/tasks/queue-granules/package.json b/cumulus/tasks/queue-granules/package.json index d2468374dce..4e5e136d189 100644 --- a/cumulus/tasks/queue-granules/package.json +++ b/cumulus/tasks/queue-granules/package.json @@ -39,6 +39,7 @@ "license": "Apache-2.0", "dependencies": { "@cumulus/common": "^1.0.1", + "@cumulus/cumulus-message-adapter-js": "0.0.1-beta.3", "@cumulus/ingest": "^1.0.1", "babel-core": "^6.25.0", "babel-loader": "^6.2.4", @@ -51,6 +52,7 @@ }, "devDependencies": { "@mapbox/mock-aws-sdk-js": "0.0.5", - "ava": "^0.21.0" + "ava": "^0.21.0", + "lodash": "^4.17.5" } } diff --git a/cumulus/tasks/queue-granules/schemas/config.json b/cumulus/tasks/queue-granules/schemas/config.json new file mode 100644 index 00000000000..8b4539b3f71 --- /dev/null +++ b/cumulus/tasks/queue-granules/schemas/config.json @@ -0,0 +1,19 @@ +{ + "title": "QueueGranulesConfig", + "description": "Describes the config used by the queue-granules task", + "type": "object", + "required": [ + "collection", + "provider", + "queueUrl", + "stack", + "granuleIngestMessageTemplateUri" + ], + "properties": { + "collection": { "type": "object" }, + "provider": { "type": "object" }, + "queueUrl": { "type": "string" }, + "stack": { "type": "string" }, + "granuleIngestMessageTemplateUri": { "type": "string" } + } +} diff --git a/cumulus/tasks/queue-granules/schemas/config.json.txt b/cumulus/tasks/queue-granules/schemas/config.json.txt deleted file mode 100644 index 37d37c6ca5e..00000000000 --- a/cumulus/tasks/queue-granules/schemas/config.json.txt +++ /dev/null @@ -1,73 +0,0 @@ -{ - "title": "QueueGranulesConfig", - "description": "Describes the config used by the queue-granules task", - "type": "object", - "properties": { - "stack": { "type": "string" }, - "provider": { - "type": "object", - "properties": { - "id": { "type": "string" }, - "globalConnectionLimit": { "type": "integer" }, - "protocol": { - "type": "string", - "enum": ["ftp", "sftp", "http", "https"] - } - } - }, - "buckets": { - "type": "object", - "description": "aws s3 buckets used by this task", - "properties": { - "internal": { "type": "string" }, - "private": { "type": "string" }, - "protected": { "type": "string" }, - "public": { "type": "string" } - } - }, - "queues": { - "type": "object", - "properties": { - "startSF": { "type": "string" } - } - }, - "templates": { - "type": "object" - }, - "cumulus_meta": { - "type": "object", - "properties": { - "config": { - "type": "object", - "properties": { - "next": { "type": "string" } - } - } - } - }, - "collection": { - "type": "object", - "description": "", - "required": ["name"], - "properties": { - "name": { "type": "string" }, - "version": { "type": "string" }, - "provider_path": { "type": "string" }, - "granuleId": { "type": "string" }, - "sampleFileName": { "type": "string" }, - "granuleIdExtraction": { "type": "string" }, - "files": { - "type": "array", - "items": { - "type": "object", - "properties": { - "regex": { "type": "string" }, - "sampleFileName": { "type": "string" }, - "bucket": { "type": "string" } - } - } - } - } - } - } -} diff --git a/cumulus/tasks/queue-granules/schemas/input.json.txt b/cumulus/tasks/queue-granules/schemas/input.json similarity index 53% rename from cumulus/tasks/queue-granules/schemas/input.json.txt rename to cumulus/tasks/queue-granules/schemas/input.json index 48e29f3d865..02555973c9f 100644 --- a/cumulus/tasks/queue-granules/schemas/input.json.txt +++ b/cumulus/tasks/queue-granules/schemas/input.json @@ -2,16 +2,24 @@ "title": "QueueGranulesInput", "description": "Describes the input and config used by the queue-granules task", "type": "object", + "require": [ "granules" ], "properties": { + "pdr": { + "type": "object", + "required": ["name", "path"], + "properties": { + "name": { "type": "string" }, + "path": { "type": "string" } + } + }, "granules": { "type": "array", "items": { "type": "object", + "required": [ "files", "granuleId" ], "properties": { - "name": { "type": "string" }, "granuleId": { "type": "string" }, - "bucket": { "type": "string" }, - "url_path": { "type": "string" } + "files": { "type": "array" } } } } diff --git a/cumulus/tasks/queue-granules/schemas/output.json.txt b/cumulus/tasks/queue-granules/schemas/output.json similarity index 65% rename from cumulus/tasks/queue-granules/schemas/output.json.txt rename to cumulus/tasks/queue-granules/schemas/output.json index 54692833a71..96b21f7c268 100644 --- a/cumulus/tasks/queue-granules/schemas/output.json.txt +++ b/cumulus/tasks/queue-granules/schemas/output.json @@ -2,9 +2,8 @@ "title": "QueueGranulesOutput", "description": "Describes the output produced by the queue-granules task", "type": "object", + "required": [ "granules_queued" ], "properties": { - "granules_queued": { - "type": "integer" - } + "granules_queued": { "type": "integer" } } } diff --git a/cumulus/tasks/queue-granules/tests/.eslintrc.json b/cumulus/tasks/queue-granules/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/cumulus/tasks/queue-granules/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/cumulus/tasks/queue-granules/tests/index.js b/cumulus/tasks/queue-granules/tests/index.js index d48edf43108..2ce56a2abf9 100644 --- a/cumulus/tasks/queue-granules/tests/index.js +++ b/cumulus/tasks/queue-granules/tests/index.js @@ -1,53 +1,205 @@ -/* eslint-disable no-param-reassign */ 'use strict'; const test = require('ava'); -const MockAWS = require('@mapbox/mock-aws-sdk-js'); const { s3, sqs, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); -const { createQueue, randomString } = require('@cumulus/common/test-utils'); +const { + createQueue, + randomString, + validateConfig, + validateInput, + validateOutput +} = require('@cumulus/common/test-utils'); -const { handler } = require('../index'); -const inputJSON = require('./fixtures/input.json'); -const workflowTemplate = require('./fixtures/workflow-template.json'); - -const aws = require('@cumulus/common/aws'); +const { queueGranules } = require('../index'); test.beforeEach(async (t) => { - t.context.bucket = randomString(); - t.context.queueUrl = await createQueue(); - await s3().createBucket({ Bucket: t.context.bucket }).promise(); + t.context.templateBucket = randomString(); + await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); + + t.context.messageTemplate = { + cumulus_meta: {}, + meta: {} + }; + const messageTemplateKey = `${randomString()}/template.json`; + await s3().putObject({ + Bucket: t.context.templateBucket, + Key: messageTemplateKey, + Body: JSON.stringify(t.context.messageTemplate) + }).promise(); + + t.context.event = { + config: { + collection: { name: 'collection-name' }, + provider: { name: 'provider-name' }, + queueUrl: await createQueue(), + stack: '', + granuleIngestMessageTemplateUri: `s3://${t.context.templateBucket}/${messageTemplateKey}` + }, + input: { + granules: [] + } + }; +}); + +test.afterEach(async (t) => { + await Promise.all([ + recursivelyDeleteS3Bucket(t.context.templateBucket), + sqs().deleteQueue({ QueueUrl: t.context.event.config.queueUrl }).promise() + ]); +}); + +test('The correct output is returned when granules are queued', async (t) => { + const event = t.context.event; + event.input.granules = [ + { granuleId: randomString(), files: [] }, + { granuleId: randomString(), files: [] } + ]; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + const output = await queueGranules(event); + + await validateOutput(t, output); + t.deepEqual(output, { granules_queued: 2 }); +}); + +test('The correct output is returned when no granules are queued', async (t) => { + const event = t.context.event; + event.input.granules = []; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + const output = await queueGranules(event); + + await validateOutput(t, output); + t.deepEqual(output, { granules_queued: 0 }); }); -test.afterEach.always((t) => - Promise.all([ - recursivelyDeleteS3Bucket(t.context.bucket), - sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() - ])); +test('Granules are added to the queue', async (t) => { + const event = t.context.event; + event.input.granules = [ + { granuleId: randomString(), files: [] }, + { granuleId: randomString(), files: [] } + ]; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + const output = await queueGranules(event); -test('queue granules', async (t) => { - const Bucket = t.context.bucket; - const IngestGranuleTemplate = `s3://${Bucket}/dev/workflows/IngestGranule.json`; + await validateOutput(t, output); - await aws.s3().putObject({ - Bucket, - Key: 'dev/workflows/IngestGranule.json', - Body: JSON.stringify(workflowTemplate) + // Get messages from the queue + const receiveMessageResponse = await sqs().receiveMessage({ + QueueUrl: t.context.event.config.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 }).promise(); + const messages = receiveMessageResponse.Messages; - MockAWS.stub('StepFunctions', 'describeExecution').returns({ - promise: () => Promise.resolve({}) - }); - - const input = Object.assign({}, inputJSON); - input.config.templateUri = IngestGranuleTemplate; - input.config.bucket = t.context.bucket; - input.config.queueUrl = t.context.queueUrl; - - return handler(input, {}, (e, output) => { - t.ifError(e); - t.is(typeof output, 'object'); - t.is(output.granules_queued, 3); - MockAWS.StepFunctions.restore(); - }); + t.is(messages.length, 2); }); + +test('The correct message is enqueued without a PDR', async (t) => { + const fileName = randomString(); + const granuleId = randomString(); + + const event = t.context.event; + event.input.granules = [ + { + granuleId, + files: [{ name: fileName }] + } + ]; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + const output = await queueGranules(event); + + await validateOutput(t, output); + + // Get messages from the queue + const receiveMessageResponse = await sqs().receiveMessage({ + QueueUrl: t.context.event.config.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise(); + const messages = receiveMessageResponse.Messages; + + const expectedMessage = { + cumulus_meta: {}, + meta: { + collection: { name: 'collection-name' }, + provider: { name: 'provider-name' } + }, + payload: { + granules: [ + { + granuleId, + files: [{ name: fileName }] + } + ] + } + }; + + t.deepEqual(JSON.parse(messages[0].Body), expectedMessage); +}); + +test('The correct message is enqueued without a PDR', async (t) => { + const fileName = randomString(); + const granuleId = randomString(); + const pdrName = randomString(); + const pdrPath = randomString(); + + const event = t.context.event; + event.input.granules = [ + { + granuleId, + files: [{ name: fileName }] + } + ]; + event.input.pdr = { name: pdrName, path: pdrPath }; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + const output = await queueGranules(event); + + await validateOutput(t, output); + + // Get messages from the queue + const receiveMessageResponse = await sqs().receiveMessage({ + QueueUrl: t.context.event.config.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise(); + const messages = receiveMessageResponse.Messages; + + const expectedMessage = { + cumulus_meta: {}, + meta: { + collection: { name: 'collection-name' }, + provider: { name: 'provider-name' } + }, + payload: { + granules: [ + { + granuleId, + files: [{ name: fileName }] + } + ], + pdr: { + name: pdrName, + path: pdrPath + } + } + }; + + t.deepEqual(JSON.parse(messages[0].Body), expectedMessage); +}); + +test.todo('An appropriate error is thrown if the message template could not be fetched'); diff --git a/packages/api/models/rules.js b/packages/api/models/rules.js index 0b379da5953..92abeeceaa4 100644 --- a/packages/api/models/rules.js +++ b/packages/api/models/rules.js @@ -89,7 +89,7 @@ class Rule extends Manager { // makes sure the workflow exists const bucket = process.env.bucket; const key = `${process.env.stackName}/workflows/${item.workflow}.json`; - const exists = await aws.fileExists(bucket, key); + const exists = await aws.s3ObjectExists(bucket, key); if (!exists) { const err = { diff --git a/packages/common/aws.js b/packages/common/aws.js index 3a28d16c2f8..317129e6cb6 100644 --- a/packages/common/aws.js +++ b/packages/common/aws.js @@ -173,29 +173,6 @@ exports.downloadS3File = (s3Obj, filename) => { exports.getS3Object = (bucket, key) => exports.s3().getObject({ Bucket: bucket, Key: key }).promise(); -/** -* Check if a file exists in an S3 object -* @name fileExists -* @param {string} bucket name of the S3 bucket -* @param {string} key key of the file in the S3 bucket -* @returns {promise} returns the response from `S3.headObject` as a promise -**/ -exports.fileExists = async (bucket, key) => { - const s3 = exports.s3(); - - try { - const r = await s3.headObject({ Key: key, Bucket: bucket }).promise(); - return r; - } - catch (e) { - // if file is not return false - if (e.stack.match(/(NotFound)/)) { - return false; - } - throw e; - } -}; - exports.downloadS3Files = (s3Objs, dir, s3opts = {}) => { // Scrub s3Ojbs to avoid errors from the AWS SDK const scrubbedS3Objs = s3Objs.map(s3Obj => ({ @@ -399,9 +376,6 @@ exports.getPossiblyRemote = async (obj) => { exports.startPromisedSfnExecution = (params) => exports.sfn().startExecution(params).promise(); -exports.getSfnExecutionByName = (stateMachineArn, executionName) => - [stateMachineArn.replace(':stateMachine:', ':execution:'), executionName].join(':'); - const getCurrentSfnTaskWithoutRetry = async (stateMachineArn, executionName) => { const sfn = exports.sfn(); const executionArn = exports.getSfnExecutionByName(stateMachineArn, executionName); @@ -482,31 +456,22 @@ exports.fromSfnExecutionName = (str, delimiter = '__') => /** * Send a message to AWS SQS -* @param {string} queueUrl url of the SQS queue -* @param {string|object} message either string or object message. If an object it -* will be serialized into a JSON string. -* @return {promise} +* +* @param {string} queueUrl - url of the SQS queue +* @param {string|Object} message - either string or object message. If an +* object it will be serialized into a JSON string. +* @returns {Promise} - resolves when the messsage has been sent **/ exports.sendSQSMessage = (queueUrl, message) => { let messageBody; + if (typeof message === 'string') messageBody = message; + else if (typeof message === 'object') messageBody = JSON.stringify(message); + else throw new Error('body type is not accepted'); - if (typeof message === 'string') { - messageBody = message; - } - else if (typeof message === 'object') { - messageBody = JSON.stringify(message); - } - else { - throw new Error('body type is not accepted'); - } - - const params = { + return exports.sqs().sendMessage({ MessageBody: messageBody, QueueUrl: queueUrl - }; - - const queue = exports.sqs(); - return queue.sendMessage(params).promise(); + }).promise(); }; /** @@ -559,28 +524,3 @@ exports.setGranuleStatus = async ( const executionArn = exports.getExecutionArn(stateMachineArn, executionName); await exports.s3().putObject(bucket, key, '', null, { executionArn, status }).promise(); }; - -/** -* Get the status of a granule -* -* @name getGranuleStatus -* @param {string} granuleId -* @param {string} stack = the deployment stackname -* @param {string} bucket - the deployment bucket name -* @return {promise|boolean} if the granule does not exist, this returns `false`, -* else returns a promise that resolves to an array of [status, arn] -**/ -exports.getGranuleStatus = async (granuleId, stack, bucket) => { - const key = exports.getGranuleS3Params(granuleId, stack, bucket); - const exists = await exports.fileExists(bucket, key); - - if (exists) { - const oarn = exists.Metadata.arn; - const status = exists.Metadata.status; - if (status === 'failed') { - return ['failed', oarn]; - } - return ['completed', oarn]; - } - return false; -}; diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index 07c14b60f9a..c2f9fa1ff7e 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -1,33 +1,21 @@ 'use strict'; -const get = require('lodash.get'); const uuidv4 = require('uuid/v4'); const { getS3Object, sendSQSMessage, - parseS3Uri, - getSfnExecutionByName, - getGranuleStatus + parseS3Uri } = require('@cumulus/common/aws'); /** * Create a message from a template stored on S3 * * @param {string} templateUri - S3 uri to the workflow template - * @param {Object} provider - Cumulus provider object - * @param {Object} collection - Cumulus collection object * @returns {Promise} message object **/ -async function getTemplate(templateUri, provider, collection) { - - const parsedS3Uri = parseS3Uri(templateUri); - const data = await getS3Object(parsedS3Uri.Bucket, parsedS3Uri.Key); - const message = JSON.parse(data.Body); - - message.meta.provider = provider; - message.meta.collection = collection; - - return message; +async function getMessageFromTemplate(templateUri) { + const data = await getS3Object(parseS3Uri(templateUri)); + return JSON.parse(data.Body); } /** @@ -42,7 +30,7 @@ async function getTemplate(templateUri, provider, collection) { * @returns {Promise} promise returned from SQS.sendMessage() **/ async function queuePdr(queueUrl, templateUri, provider, collection, pdr) { - const message = await getTemplate(templateUri, provider, collection); + const message = await getMessageFromTemplate(templateUri, provider, collection); message.payload = { pdr }; message.cumulus_meta.execution_name = uuidv4(); @@ -51,44 +39,28 @@ async function queuePdr(queueUrl, templateUri, provider, collection, pdr) { } /** - - * Create a message from a template stored on S3 - * - * @param {object} granule - * @param {string} templateUri - S3 uri to the workflow template - * @param {Object} provider - Cumulus provider object - * @param {Object} collection - Cumulus collection object - * @param {Object} pdr - the PDR object - * @param {string} pdr.name - name of the PDR - * @param {string} stack = the deployment stackname - * @param {string} bucket - the deployment bucket name - * @returns {promise} returns a promise that resolves to an array of [status, arn] - **/ -async function queueGranule( + * Enqueue a granule to be ingested + * + * @param {Object} granule - the granule to be enqueued for ingest + * @param {string} queueUrl - the SQS queue to add the message to + * @param {string} granuleIngestMessageTemplateUri - the S3 URI of template for + * a granule ingest message + * @param {Object} provider - the provider config to be attached to the message + * @param {Object} collection - the collection config to be attached to the + * message + * @param {Object} pdr - an optional PDR to be configured in the message payload + * @returns {Promise} - resolves when the message has been enqueued + */ +async function enqueueGranuleIngestMessage( granule, queueUrl, - templateUri, + granuleIngestMessageTemplateUri, provider, collection, - pdr, - stack, - bucket + pdr ) { - const message = await getTemplate(templateUri, provider, collection); - - // check if the granule is already processed - const status = await getGranuleStatus(granule.granuleId, stack, bucket); - - if (status) { - return status; - } - - // if size is larger than 450mb skip - for (const f of granule.files) { - if (f.fileSize > 450000000) { - return { completed: granule.granuleId }; - } - } + // Build the message from a template + const message = await getMessageFromTemplate(granuleIngestMessageTemplateUri); message.payload = { granules: [{ @@ -96,18 +68,13 @@ async function queueGranule( files: granule.files }] }; + if (pdr) message.payload.pdr = pdr; - if (pdr) { - message.payload.pdr = pdr; - } - - const executionName = uuidv4(); - const arn = getSfnExecutionByName(message.cumulus_meta.state_machine, executionName); + message.meta.provider = provider; + message.meta.collection = collection; - message.cumulus_meta.execution_name = executionName; - await sendSQSMessage(queueUrl, message); - return ['running', arn]; + return sendSQSMessage(queueUrl, message); } +exports.enqueueGranuleIngestMessage = enqueueGranuleIngestMessage; module.exports.queuePdr = queuePdr; -module.exports.queueGranule = queueGranule; diff --git a/packages/ingest/test/queue.js b/packages/ingest/test/queue.js deleted file mode 100644 index e7f6d9173ef..00000000000 --- a/packages/ingest/test/queue.js +++ /dev/null @@ -1,96 +0,0 @@ -'use strict'; - -const test = require('ava'); -const sinon = require('sinon'); -const { recursivelyDeleteS3Bucket, s3, sqs } = require('@cumulus/common/aws'); -const { createQueue, randomString } = require('@cumulus/common/test-utils'); -const { queueGranule } = require('../queue'); - -// Addresses CUMULUS-258 -test('queueGranule generates unique exeuction names', async (t) => { - // Setup - const internalBucketName = randomString(); - const templateBucketName = randomString(); - const templateKey = randomString(); - - // Create buckets - await Promise.all([ - s3().createBucket({ Bucket: internalBucketName }).promise(), - s3().createBucket({ Bucket: templateBucketName }).promise() - ]); - - // Updload the message template - const messageTemplate = { - cumulus_meta: { - state_machine: randomString() - }, - meta: {}, - payload: {}, - exception: null - }; - await s3().putObject({ - Bucket: templateBucketName, - Key: templateKey, - Body: JSON.stringify(messageTemplate) - }).promise(); - - const QueueUrl = await createQueue(); - - // Perform the test - const granuleIds = [ - 'MOD13Q1.A2016193.h05v13.006.2016215085023', - 'MOD13Q1.A2016193.h18v02.006.2016215090632' - ]; - - const event = { - config: { - bucket: internalBucketName, - collection: { name: 'MOD13Q1' }, - queueUrl: QueueUrl, - stack: randomString(), - templateUri: `s3://${templateBucketName}/${templateKey}` - }, - input: {} - }; - - // Stop time and enqueue the granules - this.clock = sinon.useFakeTimers(Date.now()); - await Promise.all(granuleIds.map((granuleId) => { - const granule = { granuleId, files: [] }; - return queueGranule( - granule, - event.config.queueUrl, - event.config.templateUri, - null, - event.config.collection, - null, - event.config.stack, - event.config.bucket - ); - })); - this.clock.restore(); - - // Get messages from the queue - const receiveMessageResponse = await sqs().receiveMessage({ - QueueUrl, - MaxNumberOfMessages: 10, - WaitTimeSeconds: 1 - }).promise(); - const messages = receiveMessageResponse.Messages; - - // Create a Set from the execution names fetched from SQS - const executionNames = messages - .map((message) => JSON.parse(message.Body)) - .map((body) => body.cumulus_meta.execution_name); - const setOfExecutionNames = new Set(executionNames); - - // Verify that there are two unique execution names - t.is(setOfExecutionNames.size, 2); - - // Cleanup - await Promise.all([ - recursivelyDeleteS3Bucket(internalBucketName), - recursivelyDeleteS3Bucket(templateBucketName), - sqs().deleteQueue({ QueueUrl }).promise() - ]); -}); From 45c8a9496136d5914df4777a1a77348011b4fbcc Mon Sep 17 00:00:00 2001 From: Marc Huffnagle Date: Thu, 1 Mar 2018 15:30:29 -0500 Subject: [PATCH 2/4] Undoing rules changes --- packages/api/models/rules.js | 2 +- packages/common/aws.js | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/packages/api/models/rules.js b/packages/api/models/rules.js index 92abeeceaa4..0b379da5953 100644 --- a/packages/api/models/rules.js +++ b/packages/api/models/rules.js @@ -89,7 +89,7 @@ class Rule extends Manager { // makes sure the workflow exists const bucket = process.env.bucket; const key = `${process.env.stackName}/workflows/${item.workflow}.json`; - const exists = await aws.s3ObjectExists(bucket, key); + const exists = await aws.fileExists(bucket, key); if (!exists) { const err = { diff --git a/packages/common/aws.js b/packages/common/aws.js index 317129e6cb6..cf2fe00ef44 100644 --- a/packages/common/aws.js +++ b/packages/common/aws.js @@ -173,6 +173,30 @@ exports.downloadS3File = (s3Obj, filename) => { exports.getS3Object = (bucket, key) => exports.s3().getObject({ Bucket: bucket, Key: key }).promise(); +/** +* Check if a file exists in an S3 object +* @name fileExists +* @param {string} bucket name of the S3 bucket +* @param {string} key key of the file in the S3 bucket +* @returns {promise} returns the response from `S3.headObject` as a promise +**/ +exports.fileExists = async (bucket, key) => { + const s3 = exports.s3(); + + try { + const r = await s3.headObject({ Key: key, Bucket: bucket }).promise(); + return r; + } + catch (e) { + // if file is not return false + if (e.stack.match(/(NotFound)/)) { + return false; + } + throw e; + } +}; + + exports.downloadS3Files = (s3Objs, dir, s3opts = {}) => { // Scrub s3Ojbs to avoid errors from the AWS SDK const scrubbedS3Objs = s3Objs.map(s3Obj => ({ From d60b60b1b8250d570b67a482a65227e4c41ddcae Mon Sep 17 00:00:00 2001 From: Marc Huffnagle Date: Thu, 1 Mar 2018 15:38:36 -0500 Subject: [PATCH 3/4] Fix introduced bug in queue.js --- packages/ingest/queue.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/ingest/queue.js b/packages/ingest/queue.js index c2f9fa1ff7e..b045a249f49 100644 --- a/packages/ingest/queue.js +++ b/packages/ingest/queue.js @@ -14,7 +14,8 @@ const { * @returns {Promise} message object **/ async function getMessageFromTemplate(templateUri) { - const data = await getS3Object(parseS3Uri(templateUri)); + const parsedS3Uri = parseS3Uri(templateUri); + const data = await getS3Object(parsedS3Uri.Bucket, parsedS3Uri.Key); return JSON.parse(data.Body); } From 0eb5d29226b0fd434dd004bd64d36cacb34cd829 Mon Sep 17 00:00:00 2001 From: Marc Huffnagle Date: Thu, 1 Mar 2018 19:49:27 -0500 Subject: [PATCH 4/4] Improve queue-granules tests --- .../tasks/queue-granules/schemas/config.json | 3 +- cumulus/tasks/queue-granules/tests/index.js | 81 ++++++++++++++----- packages/common/test-utils.js | 14 +++- 3 files changed, 74 insertions(+), 24 deletions(-) diff --git a/cumulus/tasks/queue-granules/schemas/config.json b/cumulus/tasks/queue-granules/schemas/config.json index 8b4539b3f71..adff13c2691 100644 --- a/cumulus/tasks/queue-granules/schemas/config.json +++ b/cumulus/tasks/queue-granules/schemas/config.json @@ -6,14 +6,13 @@ "collection", "provider", "queueUrl", - "stack", "granuleIngestMessageTemplateUri" ], + "additionalProperties": false, "properties": { "collection": { "type": "object" }, "provider": { "type": "object" }, "queueUrl": { "type": "string" }, - "stack": { "type": "string" }, "granuleIngestMessageTemplateUri": { "type": "string" } } } diff --git a/cumulus/tasks/queue-granules/tests/index.js b/cumulus/tasks/queue-granules/tests/index.js index 2ce56a2abf9..8ed631aa4d7 100644 --- a/cumulus/tasks/queue-granules/tests/index.js +++ b/cumulus/tasks/queue-granules/tests/index.js @@ -14,11 +14,15 @@ const { const { queueGranules } = require('../index'); test.beforeEach(async (t) => { + t.context.stateMachineArn = randomString(); + t.context.templateBucket = randomString(); await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); t.context.messageTemplate = { - cumulus_meta: {}, + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, meta: {} }; const messageTemplateKey = `${randomString()}/template.json`; @@ -33,7 +37,6 @@ test.beforeEach(async (t) => { collection: { name: 'collection-name' }, provider: { name: 'provider-name' }, queueUrl: await createQueue(), - stack: '', granuleIngestMessageTemplateUri: `s3://${t.context.templateBucket}/${messageTemplateKey}` }, input: { @@ -104,14 +107,20 @@ test('Granules are added to the queue', async (t) => { }); test('The correct message is enqueued without a PDR', async (t) => { - const fileName = randomString(); - const granuleId = randomString(); + const fileNameA = randomString(); + const granuleIdA = randomString(); + const fileNameB = randomString(); + const granuleIdB = randomString(); const event = t.context.event; event.input.granules = [ { - granuleId, - files: [{ name: fileName }] + granuleId: granuleIdA, + files: [{ name: fileNameA }] + }, + { + granuleId: granuleIdB, + files: [{ name: fileNameB }] } ]; @@ -122,16 +131,11 @@ test('The correct message is enqueued without a PDR', async (t) => { await validateOutput(t, output); - // Get messages from the queue - const receiveMessageResponse = await sqs().receiveMessage({ - QueueUrl: t.context.event.config.queueUrl, - MaxNumberOfMessages: 10, - WaitTimeSeconds: 1 - }).promise(); - const messages = receiveMessageResponse.Messages; - - const expectedMessage = { - cumulus_meta: {}, + const expectedMessages = {}; + expectedMessages[granuleIdA] = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, meta: { collection: { name: 'collection-name' }, provider: { name: 'provider-name' } @@ -139,17 +143,50 @@ test('The correct message is enqueued without a PDR', async (t) => { payload: { granules: [ { - granuleId, - files: [{ name: fileName }] + granuleId: granuleIdA, + files: [{ name: fileNameA }] + } + ] + } + }; + expectedMessages[granuleIdB] = { + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, + meta: { + collection: { name: 'collection-name' }, + provider: { name: 'provider-name' } + }, + payload: { + granules: [ + { + granuleId: granuleIdB, + files: [{ name: fileNameB }] } ] } }; - t.deepEqual(JSON.parse(messages[0].Body), expectedMessage); + // Get messages from the queue + const receiveMessageResponse = await sqs().receiveMessage({ + QueueUrl: t.context.event.config.queueUrl, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1 + }).promise(); + const messages = receiveMessageResponse.Messages.map((message) => JSON.parse(message.Body)); + + const receivedGranuleIds = messages.map((message) => message.payload.granules[0].granuleId); + t.true(receivedGranuleIds.includes(granuleIdA)); + t.true(receivedGranuleIds.includes(granuleIdB)); + + t.is(messages.length, 2); + messages.forEach((message) => { + const granuleId = message.payload.granules[0].granuleId; + t.deepEqual(message, expectedMessages[granuleId]); + }); }); -test('The correct message is enqueued without a PDR', async (t) => { +test('The correct message is enqueued with a PDR', async (t) => { const fileName = randomString(); const granuleId = randomString(); const pdrName = randomString(); @@ -180,7 +217,9 @@ test('The correct message is enqueued without a PDR', async (t) => { const messages = receiveMessageResponse.Messages; const expectedMessage = { - cumulus_meta: {}, + cumulus_meta: { + state_machine: t.context.stateMachineArn + }, meta: { collection: { name: 'collection-name' }, provider: { name: 'provider-name' } diff --git a/packages/common/test-utils.js b/packages/common/test-utils.js index 4f32d078d4e..71f67cb9172 100644 --- a/packages/common/test-utils.js +++ b/packages/common/test-utils.js @@ -6,7 +6,6 @@ const crypto = require('crypto'); const path = require('path'); const url = require('url'); const aws = require('./aws'); -const { readFile } = require('fs'); const fs = require('fs-extra'); /** @@ -217,3 +216,16 @@ function findTestDataDirectory() { .then((gitRepoRoot) => path.join(gitRepoRoot, 'packages', 'test-data')); } exports.findTestDataDirectory = findTestDataDirectory; + +/** + * Prettify and display something to the console. + * + * This is only intended to be used during debugging. + * + * @param {Object|Array} object - an object or array to be stringifyed + * @returns {undefined} - no return value + */ +function jlog(object) { + console.log(JSON.stringify(object, null, 2)); +} +exports.jlog = jlog;