From e5747551284b0ce23467a2df9438dc74bf850914 Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Tue, 6 Mar 2018 15:13:23 -0500 Subject: [PATCH 1/9] update pdr stats --- cumulus/tasks/pdr-status-check/index.js | 3 ++- cumulus/tasks/pdr-status-check/schemas/input.json | 10 +++++++++- cumulus/tasks/pdr-status-check/schemas/output.json | 10 +++++++++- cumulus/tasks/pdr-status-check/tests/index.js | 6 ++++-- .../test-data/cumulus_messages/pdr-status-check.json | 6 +++++- 5 files changed, 29 insertions(+), 6 deletions(-) diff --git a/cumulus/tasks/pdr-status-check/index.js b/cumulus/tasks/pdr-status-check/index.js index 75f06f25815..5d90281fd28 100644 --- a/cumulus/tasks/pdr-status-check/index.js +++ b/cumulus/tasks/pdr-status-check/index.js @@ -117,7 +117,8 @@ function buildOutput(event, groupedExecutions) { isFinished: groupedExecutions.running.length === 0, running, failed, - completed + completed, + pdr: event.input.pdr }; if (!output.isFinished) { diff --git a/cumulus/tasks/pdr-status-check/schemas/input.json b/cumulus/tasks/pdr-status-check/schemas/input.json index 82fe1fe8e52..afdd94941de 100644 --- a/cumulus/tasks/pdr-status-check/schemas/input.json +++ b/cumulus/tasks/pdr-status-check/schemas/input.json @@ -23,6 +23,14 @@ }, "counter": { "type": "integer" }, "limit": { "type": "integer" }, - "isFinished": { "type": "boolean" } + "isFinished": { "type": "boolean" }, + "pdr": { + "type": "object", + "required": ["name", "path"], + "properties": { + "name": { "type": "string" }, + "path": { "type": "string" } + } + } } } diff --git a/cumulus/tasks/pdr-status-check/schemas/output.json b/cumulus/tasks/pdr-status-check/schemas/output.json index 7bcc74594a3..df4142e27c7 100644 --- a/cumulus/tasks/pdr-status-check/schemas/output.json +++ b/cumulus/tasks/pdr-status-check/schemas/output.json @@ -23,6 +23,14 @@ }, "counter": { "type": "integer" }, "limit": { "type": "integer" }, - "isFinished": { "type": "boolean" } + "isFinished": { "type": "boolean" }, + "pdr": { + "type": "object", + "required": ["name", "path"], + "properties": { + "name": { "type": "string" }, + "path": { "type": "string" } + } + } } } diff --git a/cumulus/tasks/pdr-status-check/tests/index.js b/cumulus/tasks/pdr-status-check/tests/index.js index 9b3d5b11fc6..0ee7350b2ef 100644 --- a/cumulus/tasks/pdr-status-check/tests/index.js +++ b/cumulus/tasks/pdr-status-check/tests/index.js @@ -9,7 +9,8 @@ const { checkPdrStatuses } = require('../index'); test('valid output when no running executions', (t) => { const event = { input: { - running: [] + running: [], + pdr: { name: 'test.PDR', path: 'test-path' } } }; @@ -19,7 +20,8 @@ test('valid output when no running executions', (t) => { isFinished: true, running: [], failed: [], - completed: [] + completed: [], + pdr: { name: 'test.PDR', path: 'test-path' } }; t.deepEqual(output, expectedOutput); diff --git a/packages/test-data/cumulus_messages/pdr-status-check.json b/packages/test-data/cumulus_messages/pdr-status-check.json index fa0f578d9ea..55fdb8c90be 100644 --- a/packages/test-data/cumulus_messages/pdr-status-check.json +++ b/packages/test-data/cumulus_messages/pdr-status-check.json @@ -85,7 +85,11 @@ "running": [ "arn:aws:states:us-east-1:000000000000:execution:LpdaacCumulusIngestGranuleS-pOyNXh5jeR4h:d5b6344a-36eb-4c97-a5cf-3f6e83f0692a" ], - "limit": 30 + "limit": 30, + "pdr": { + "name": "MOD09GQ_1granule_v2.PDR", + "path": "/" + } }, "exception": "None", "workflow_config": { From ca5634dcf1dad30f68ee94caddd69acbbe676f48 Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Fri, 9 Mar 2018 09:52:52 -0500 Subject: [PATCH 2/9] update pdr stats --- CHANGELOG.md | 5 ++ cumulus/tasks/pdr-status-check/index.js | 61 +++++++++++++------ cumulus/tasks/pdr-status-check/package.json | 1 + cumulus/tasks/pdr-status-check/tests/index.js | 38 ++++++++---- 4 files changed, 77 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fba62c472e8..515cd22db23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - `@cumulus/deployment` deploys DynamoDB streams for the Collections, Providers and Rules tables as well as a new lambda function called `dbIndexer`. The `dbIndexer` lambda has an event source mapping which listens to each of the DynamoDB streams. The dbIndexer lambda receives events referencing operations on the DynamoDB table and updates the elasticsearch cluster accordingly. - The `@cumulus/api` endpoints for collections, providers and rules _only_ query DynamoDB, with the exception of LIST endpoints and the collections' GET endpoint. +- **CUMULUS-260: "PDR page on dashboard only shows zeros."** The PDR stats in LPDAAC are all 0s, even if the dashboard has been fixed to retrieve the correct fields. The current version of pdr-status-check has a few issues. + - pdr is not included in the input/output schema. It's available from the input event. So the pdr status and stats are not updated when the ParsePdr workflow is complete. Adding the pdr to the input/output of the task will fix this. + - pdr-status-check doesn't update pdr stats which prevent the real time pdr progress from showing up in the dashboard. Added code to update pdr stats in pdr-status-check task. + - It's possible a execution is still in the queue and doesn't exist in sfn yet. Added code to handle 'ExecutionDoesNotExist' error when checking the execution status. + ### Updated - Broke up `kes.override.js` of @cumulus/deployment to multiple modules and moved to a new location - Expanded @cumulus/deployment test coverage diff --git a/cumulus/tasks/pdr-status-check/index.js b/cumulus/tasks/pdr-status-check/index.js index 5d90281fd28..4d80a49396f 100644 --- a/cumulus/tasks/pdr-status-check/index.js +++ b/cumulus/tasks/pdr-status-check/index.js @@ -5,6 +5,7 @@ const aws = require('@cumulus/common/aws'); const { IncompleteError } = require('@cumulus/common/errors'); const log = require('@cumulus/common/log'); const { justLocalRun } = require('@cumulus/common/local-helpers'); +const indexer = require('@cumulus/api/es/indexer'); // The default number of times to re-check for completion const defaultRetryLimit = 30; @@ -129,6 +130,20 @@ function buildOutput(event, groupedExecutions) { return output; } +/** + * update stats of the pdr in elasticsearch + * + * @param {Object} payload - the result of the pdr-status-check + * @returns {Promise.} - elasticsearch update response + */ +async function updatePdrStatuses(payload) { + const stats = { + processing: payload.running.length, + completed: payload.completed.length, + failed: payload.failed.length + }; + await indexer.partialRecordUpdate(null, payload.pdr.name, 'pdr', { stats }); +} /** * Checks a list of Step Function Executions to see if they are all in * terminal states. @@ -138,28 +153,40 @@ function buildOutput(event, groupedExecutions) { * @returns {Promise.} - an object describing the status of Step * Function executions related to a PDR */ -function checkPdrStatuses(event) { +async function checkPdrStatuses(event) { const runningExecutionArns = event.input.running || []; - const promisedExecutionDescriptions = runningExecutionArns.map((executionArn) => - aws.sfn().describeExecution({ executionArn }).promise()); - - return Promise.all(promisedExecutionDescriptions) - .then(groupExecutionsByStatus) - .then((groupedExecutions) => { - const counter = getCounterFromEvent(event) + 1; - const exceededLimit = counter >= getLimitFromEvent(event); + const executions = []; + for (const executionArn of runningExecutionArns) { + try { + const execution = await aws.sfn().describeExecution({ executionArn }).promise(); + executions.push(execution); + } + catch (e) { + log.debug(e); + // it's ok if a execution is still in the queue and has not be executed + if (e.errorType === 'ExecutionDoesNotExist') { + executions.push({ executionArn: executionArn, status: 'RUNNING' }); + } + else throw e; + } + } - const executionsAllDone = groupedExecutions.running.length === 0; + const groupedExecutions = groupExecutionsByStatus(executions); + const counter = getCounterFromEvent(event) + 1; + const exceededLimit = counter >= getLimitFromEvent(event); - if (!executionsAllDone && exceededLimit) { - throw new IncompleteError(`PDR didn't complete after ${counter} checks`); - } + const executionsAllDone = groupedExecutions.running.length === 0; + if (!executionsAllDone && exceededLimit) { + throw new IncompleteError(`PDR didn't complete after ${counter} checks`); + } - const output = buildOutput(event, groupedExecutions); - if (!output.isFinished) logStatus(output); - return output; - }); + const output = buildOutput(event, groupedExecutions); + if (!output.isFinished) logStatus(output); + const response = await updatePdrStatuses(output); + log.debug(response); + //await updatePdrStatuses(output); + return output; } exports.checkPdrStatuses = checkPdrStatuses; diff --git a/cumulus/tasks/pdr-status-check/package.json b/cumulus/tasks/pdr-status-check/package.json index 9afd90c5a48..6eb49b94d83 100644 --- a/cumulus/tasks/pdr-status-check/package.json +++ b/cumulus/tasks/pdr-status-check/package.json @@ -40,6 +40,7 @@ ] }, "dependencies": { + "@cumulus/api": "^1.1.1", "@cumulus/common": "^1.1.0", "@cumulus/cumulus-message-adapter-js": "^1.0.1", "@cumulus/ingest": "^1.1.1", diff --git a/cumulus/tasks/pdr-status-check/tests/index.js b/cumulus/tasks/pdr-status-check/tests/index.js index 0ee7350b2ef..e597b1014f6 100644 --- a/cumulus/tasks/pdr-status-check/tests/index.js +++ b/cumulus/tasks/pdr-status-check/tests/index.js @@ -4,8 +4,13 @@ const _ = require('lodash'); const test = require('ava'); const sinon = require('sinon'); const aws = require('@cumulus/common/aws'); +const indexer = require('@cumulus/api/es/indexer'); const { checkPdrStatuses } = require('../index'); +test.before(() => { + sinon.stub(indexer, 'partialRecordUpdate'); +}); + test('valid output when no running executions', (t) => { const event = { input: { @@ -43,7 +48,8 @@ test('error thrown when limit exceeded', (t) => { input: { running: ['arn:123'], counter: 2, - limit: 3 + limit: 3, + pdr: { name: 'test.PDR', path: 'test-path' } } }; @@ -63,38 +69,48 @@ test('returns the correct results in the nominal case', (t) => { 'arn:1': 'RUNNING', 'arn:2': 'SUCCEEDED', 'arn:3': 'FAILED', - 'arn:4': 'ABORTED' + 'arn:4': 'ABORTED', + 'arn:7': null + }; + const error = { + errorMessage: 'Execution Does Not Exist:arn', + errorType: 'ExecutionDoesNotExist', + stackTrace: [] }; const stubSfnClient = { describeExecution: ({ executionArn }) => ({ - promise: () => Promise.resolve({ - executionArn, - status: executionStatuses[executionArn] - }) + promise: () => { + if (executionStatuses[executionArn] === null) return Promise.reject(error); + return Promise.resolve({ + executionArn, + status: executionStatuses[executionArn] + }); + } }) }; - const stub = sinon.stub(aws, 'sfn').returns(stubSfnClient); + const stubsf = sinon.stub(aws, 'sfn').returns(stubSfnClient); const event = { input: { - running: ['arn:1', 'arn:2', 'arn:3', 'arn:4'], + running: ['arn:1', 'arn:2', 'arn:3', 'arn:4', 'arn:7'], completed: ['arn:5'], failed: [{ arn: 'arn:6', reason: 'OutOfCheese' }], counter: 5, - limit: 10 + limit: 10, + pdr: { name: 'test.PDR', path: 'test-path' } } }; return checkPdrStatuses(event) .then((output) => { - stub.restore(); + stubsf.restore(); t.false(output.isFinished); t.is(output.counter, 6); t.is(output.limit, 10); - t.deepEqual(output.running, ['arn:1']); + t.deepEqual(output.running, ['arn:1', 'arn:7']); t.deepEqual(output.completed.sort(), ['arn:2', 'arn:5'].sort()); t.is(output.failed.length, 3); From ff23e5df216413a94fe3f1c98a8a51bfa641d8e4 Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Fri, 9 Mar 2018 12:16:27 -0500 Subject: [PATCH 3/9] remove update pdr stats from pdr-status-check --- cumulus/tasks/pdr-status-check/index.js | 24 +++---------------- cumulus/tasks/pdr-status-check/tests/index.js | 10 ++------ 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/cumulus/tasks/pdr-status-check/index.js b/cumulus/tasks/pdr-status-check/index.js index 4d80a49396f..74847119843 100644 --- a/cumulus/tasks/pdr-status-check/index.js +++ b/cumulus/tasks/pdr-status-check/index.js @@ -5,7 +5,6 @@ const aws = require('@cumulus/common/aws'); const { IncompleteError } = require('@cumulus/common/errors'); const log = require('@cumulus/common/log'); const { justLocalRun } = require('@cumulus/common/local-helpers'); -const indexer = require('@cumulus/api/es/indexer'); // The default number of times to re-check for completion const defaultRetryLimit = 30; @@ -79,7 +78,8 @@ function logStatus(output) { * failed: [ * { arn: 'arn:456', reason: 'Workflow Aborted' } * ], - * completed: [] + * completed: [], + * pdr: {} * } * * @param {Object} event - the event that came into checkPdrStatuses @@ -130,20 +130,6 @@ function buildOutput(event, groupedExecutions) { return output; } -/** - * update stats of the pdr in elasticsearch - * - * @param {Object} payload - the result of the pdr-status-check - * @returns {Promise.} - elasticsearch update response - */ -async function updatePdrStatuses(payload) { - const stats = { - processing: payload.running.length, - completed: payload.completed.length, - failed: payload.failed.length - }; - await indexer.partialRecordUpdate(null, payload.pdr.name, 'pdr', { stats }); -} /** * Checks a list of Step Function Executions to see if they are all in * terminal states. @@ -163,9 +149,8 @@ async function checkPdrStatuses(event) { executions.push(execution); } catch (e) { - log.debug(e); // it's ok if a execution is still in the queue and has not be executed - if (e.errorType === 'ExecutionDoesNotExist') { + if (e.code === 'ExecutionDoesNotExist') { executions.push({ executionArn: executionArn, status: 'RUNNING' }); } else throw e; @@ -183,9 +168,6 @@ async function checkPdrStatuses(event) { const output = buildOutput(event, groupedExecutions); if (!output.isFinished) logStatus(output); - const response = await updatePdrStatuses(output); - log.debug(response); - //await updatePdrStatuses(output); return output; } exports.checkPdrStatuses = checkPdrStatuses; diff --git a/cumulus/tasks/pdr-status-check/tests/index.js b/cumulus/tasks/pdr-status-check/tests/index.js index e597b1014f6..474f1cf2592 100644 --- a/cumulus/tasks/pdr-status-check/tests/index.js +++ b/cumulus/tasks/pdr-status-check/tests/index.js @@ -4,13 +4,8 @@ const _ = require('lodash'); const test = require('ava'); const sinon = require('sinon'); const aws = require('@cumulus/common/aws'); -const indexer = require('@cumulus/api/es/indexer'); const { checkPdrStatuses } = require('../index'); -test.before(() => { - sinon.stub(indexer, 'partialRecordUpdate'); -}); - test('valid output when no running executions', (t) => { const event = { input: { @@ -73,9 +68,8 @@ test('returns the correct results in the nominal case', (t) => { 'arn:7': null }; const error = { - errorMessage: 'Execution Does Not Exist:arn', - errorType: 'ExecutionDoesNotExist', - stackTrace: [] + message: 'Execution Does Not Exist: arn', + code: 'ExecutionDoesNotExist' }; const stubSfnClient = { From 1f3a8abb07327c67f3d3abdced9d136af01676da Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Mon, 12 Mar 2018 20:22:55 -0400 Subject: [PATCH 4/9] use cumulus-message-adapter --- cumulus/tasks/pdr-status-check/tests/index.js | 4 +- cumulus/tasks/sf-sns-report/README.md | 27 ++++ cumulus/tasks/sf-sns-report/index.js | 127 ++++++++++++++++++ cumulus/tasks/sf-sns-report/package.json | 55 ++++++++ .../tasks/sf-sns-report/schemas/config.json | 28 ++++ .../tasks/sf-sns-report/tests/.eslintrc.json | 5 + cumulus/tasks/sf-sns-report/tests/index.js | 68 ++++++++++ cumulus/tasks/sf-sns-report/webpack.config.js | 19 +++ packages/common/aws.js | 18 ++- 9 files changed, 342 insertions(+), 9 deletions(-) create mode 100644 cumulus/tasks/sf-sns-report/README.md create mode 100644 cumulus/tasks/sf-sns-report/index.js create mode 100644 cumulus/tasks/sf-sns-report/package.json create mode 100644 cumulus/tasks/sf-sns-report/schemas/config.json create mode 100644 cumulus/tasks/sf-sns-report/tests/.eslintrc.json create mode 100644 cumulus/tasks/sf-sns-report/tests/index.js create mode 100644 cumulus/tasks/sf-sns-report/webpack.config.js diff --git a/cumulus/tasks/pdr-status-check/tests/index.js b/cumulus/tasks/pdr-status-check/tests/index.js index 474f1cf2592..d926496ef11 100644 --- a/cumulus/tasks/pdr-status-check/tests/index.js +++ b/cumulus/tasks/pdr-status-check/tests/index.js @@ -83,7 +83,7 @@ test('returns the correct results in the nominal case', (t) => { } }) }; - const stubsf = sinon.stub(aws, 'sfn').returns(stubSfnClient); + const stub = sinon.stub(aws, 'sfn').returns(stubSfnClient); const event = { input: { @@ -98,7 +98,7 @@ test('returns the correct results in the nominal case', (t) => { return checkPdrStatuses(event) .then((output) => { - stubsf.restore(); + stub.restore(); t.false(output.isFinished); t.is(output.counter, 6); diff --git a/cumulus/tasks/sf-sns-report/README.md b/cumulus/tasks/sf-sns-report/README.md new file mode 100644 index 00000000000..76f259532d6 --- /dev/null +++ b/cumulus/tasks/sf-sns-report/README.md @@ -0,0 +1,27 @@ +# @cumulus/queue-pdrs + +[![CircleCI](https://circleci.com/gh/cumulus-nasa/cumulus.svg?style=svg)](https://circleci.com/gh/cumulus-nasa/cumulus) + +Broadcast an incoming Cumulus message to SNS. This lambda function works with Cumulus Message Adapter, and it can be used anywhere in a step function workflow to report granule and PDR status. + +To report the PDR's progress as it's being processed, add the following step after each pdr-status-check: +` PdrStatusReport: + CumulusConfig: + cumulus_message: + input: '{$}' + outputs: + - source: '{$.payload}' + destination: '{$.payload}' + Type: Task + Resource: ${SfSnsReportLambdaFunction.Arn} +` + +## What is Cumulus? + +Cumulus is a cloud-based data ingest, archive, distribution and management prototype for NASA's future Earth science data streams. + +[Cumulus Documentation](https://cumulus-nasa.github.io/) + +## Contributing + +See [Cumulus README](https://github.com/cumulus-nasa/cumulus/blob/master/README.md#installing-and-deploying) diff --git a/cumulus/tasks/sf-sns-report/index.js b/cumulus/tasks/sf-sns-report/index.js new file mode 100644 index 00000000000..1ada2428d65 --- /dev/null +++ b/cumulus/tasks/sf-sns-report/index.js @@ -0,0 +1,127 @@ +'use strict'; + +const get = require('lodash.get'); +const { StepFunction } = require('@cumulus/ingest/aws'); +const { setGranuleStatus, sns } = require('@cumulus/common/aws'); +const errors = require('@cumulus/common/errors'); +const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); + +/** + * Determines if there was a valid exception in the input message + * + * @param {Object} event - aws event object + * @returns {boolean} true if there was an exception, false otherwise + */ +function eventFailed(event) { + if (event.exception) { + if (typeof event.exception === 'object') { + // this is needed to avoid flagging cases like "exception: {}" or "exception: 'none'" + if (Object.keys(event.exception).length > 0) { + return true; + } + } + } + // Error and error keys are not part of the cumulus message + // and if they appear in the message something is seriously wrong + else if (event.Error || event.error) { + return true; + } + return false; +} + +/** + * if the cumulus message shows that a previous step failed, + * this function extract the error message from the cumulus message + * and fail the function with that information. This ensures that the + * Step Function workflow fails with the correct error info + * + * @param {Object} event - aws event object + * @returns {undefined} throws an error and does not return anything + */ +function makeLambdaFunctionFail(event) { + const error = get(event, 'exception.Error', get(event, 'error.Error')); + const cause = get(event, 'exception.Cause', get(event, 'error.Cause')); + if (error) { + if (errors[error]) { + throw new errors[error](cause); + } + else if (error === 'TypeError') { + throw new TypeError(cause); + } + throw new Error(cause); + } + + throw new Error('Step Function failed for an unknown reason.'); +} + +/** + * Publishes incoming Cumulus Message in its entirety to + * a given SNS topic + * + * @param {Object} event - a Cumulus Message that has been sent through the + * Cumulus Message Adapter. See schemas/input.json for detailed input schema. + * @param {Object} event.config - configuration object for the task + * @param {Object} event.config.sfnEnd - indicate if it's the last step of the step function + * @param {string} event.config.stack - the name of the deployment stack + * @param {string} event.config.bucket - S3 bucket + * @param {string} event.config.stateMachine - current state machine + * @param {string} event.config.executionTime - execution time + * @returns {Promise.} - AWS SNS response. see schemas/output.json for detailed output + * schema that is passed to the next task in the workflow + */ +async function publishSnsMessage(event) { + const config = get(event, 'config', []); + const message = get(event, 'input', []); + + const finished = get(config, 'sfnEnd', false); + const topicArn = get(message, 'meta.topic_arn', null); + const failed = eventFailed(message); + + if (topicArn) { + // if this is the sns call at the end of the execution + if (finished) { + message.meta.status = failed ? 'failed' : 'completed'; + const granuleId = get(message, 'meta.granuleId', null); + if (granuleId) { + await setGranuleStatus( + granuleId, + get(config, 'stack', null), + get(config, 'bucket', null), + get(config, 'stateMachine', null), + get(config, 'executionName', null), + message.meta.status + ); + } + } + else { + message.meta.status = 'running'; + } + + await sns().publish({ + TopicArn: topicArn, + Message: JSON.stringify(message) + }).promise(); + } + + if (failed) { + makeLambdaFunctionFail(message); + } + + return message; +} + +exports.publishSnsMessage = publishSnsMessage; + +/** + * Lambda handler. It broadcasts an incoming Cumulus message to SNS + * + * @param {Object} event - a Cumulus Message + * @param {Object} context - an AWS Lambda context object + * @param {Function} callback - an AWS Lambda call back + * @returns {Promise} updated event object + */ +function handler(event, context, callback) { + return StepFunction.pullEvent(event).then((message) => + cumulusMessageAdapter.runCumulusTask(publishSnsMessage, message, context, callback)); +} +exports.handler = handler; diff --git a/cumulus/tasks/sf-sns-report/package.json b/cumulus/tasks/sf-sns-report/package.json new file mode 100644 index 00000000000..ccd80bfcd59 --- /dev/null +++ b/cumulus/tasks/sf-sns-report/package.json @@ -0,0 +1,55 @@ +{ + "name": "@cumulus/sf-sns-report", + "version": "1.1.1", + "description": "Broadcasts an incoming Cumulus message to SNS", + "main": "index.js", + "directories": { + "test": "tests" + }, + "repository": { + "type": "git", + "url": "https://github.com/cumulus-nasa/cumulus" + }, + "publishConfig": { + "access": "public" + }, + "scripts": { + "test": "env TEST=true ava tests/*.js", + "build": "webpack --progress", + "watch": "webpack --progress -w", + "postinstall": "npm run build" + }, + "ava": { + "babel": "inherit", + "require": [ + "babel-polyfill", + "babel-register" + ] + }, + "babel": { + "presets": [ + "es2015" + ], + "plugins": [ + "transform-async-to-generator" + ] + }, + "author": "Cumulus Authors", + "license": "Apache-2.0", + "dependencies": { + "@cumulus/common": "^1.1.0", + "@cumulus/cumulus-message-adapter-js": "^1.0.1", + "@cumulus/ingest": "^1.1.1", + "babel-core": "^6.25.0", + "babel-loader": "^6.2.4", + "babel-plugin-transform-async-to-generator": "^6.24.1", + "babel-polyfill": "^6.23.0", + "babel-preset-es2015": "^6.24.1", + "json-loader": "~0.5.7", + "lodash.get": "^4.4.2", + "webpack": "~1.12.13" + }, + "devDependencies": { + "ava": "^0.21.0" + } +} diff --git a/cumulus/tasks/sf-sns-report/schemas/config.json b/cumulus/tasks/sf-sns-report/schemas/config.json new file mode 100644 index 00000000000..7ab81fe80ea --- /dev/null +++ b/cumulus/tasks/sf-sns-report/schemas/config.json @@ -0,0 +1,28 @@ +{ + "title": "SfSnsReportConfig", + "description": "Describes the config used by the sf-sns-report task", + "type": "object", + "additionalProperties": false, + "properties": { + "sfnEnd": { + "description": "indicate if it's the last step of the step function.", + "type": "boolean" + }, + "stack": { + "description": "the name of the deployment stack (from meta.stack). Required when sfnEnd is true and granule status is reported.", + "type": "string" + }, + "bucket": { + "description": "S3 bucket (from meta.buckets.internal). Required when sfnEnd is true and granule status is reported.", + "type": "string" + }, + "stateMachine": { + "description": "current state machine (from cumulus_meta.state_machine). Required when sfnEnd is true and granule status is reported.", + "type": "string" + }, + "executionTime": { + "description": "execution time (from cumulus_meta.execution_name). Required when sfnEnd is true and granule status is reported.", + "type": "string" + } + } +} diff --git a/cumulus/tasks/sf-sns-report/tests/.eslintrc.json b/cumulus/tasks/sf-sns-report/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/cumulus/tasks/sf-sns-report/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/cumulus/tasks/sf-sns-report/tests/index.js b/cumulus/tasks/sf-sns-report/tests/index.js new file mode 100644 index 00000000000..59093ef5e7f --- /dev/null +++ b/cumulus/tasks/sf-sns-report/tests/index.js @@ -0,0 +1,68 @@ +'use strict'; + +const test = require('ava'); +const { recursivelyDeleteS3Bucket, s3 } = require('@cumulus/common/aws'); +const { publishSnsMessage } = require('../index'); +const { cloneDeep } = require('lodash'); +const { randomString } = require('@cumulus/common/test-utils'); + +test('send report when sfn is running', (t) => { + const event = { + input: { + meta: { topic_arn: 'test_topic_arn' }, + anykey: 'anyvalue' + } + }; + + return publishSnsMessage(cloneDeep(event)) + .then((output) => { + event.input.meta.status = 'running'; + t.deepEqual(output, event.input); + }); +}); + +test('send report when sfn is running and lamda function fails', (t) => { + const event = { + input: { + meta: { topic_arn: 'test_topic_arn' }, + exception: { + Error: 'TheError', + Cause: 'bucket not found' + }, + anykey: 'anyvalue' + } + }; + + return publishSnsMessage(cloneDeep(event)) + .catch((e) => { + t.is(e.message, event.input.exception.Cause); + }); +}); + +test('send report when sfn is finished and granule succeed', async (t) => { + const input = { + meta: { + topic_arn: 'test_topic_arn', + granuleId: randomString() + }, + anykey: 'anyvalue' + }; + const event = {}; + event.input = input; + event.config = {}; + event.config.sfnEnd = true; + event.config.stack = 'test_stack'; + event.config.bucket = randomString(); + event.config.stateMachine = + 'arn:aws:states:us-east-1:596205514787:stateMachine:TestCumulusParsePdrStateMach-K5Qk90fc8w4U'; + event.config.executionName = '7c543392-1da9-47f0-9c34-f43f6519412a'; + + await s3().createBucket({ Bucket: event.config.bucket }).promise(); + return publishSnsMessage(cloneDeep(event)) + .then((output) => { + event.input.meta.status = 'completed'; + t.deepEqual(output, event.input); + }) + .then(() => recursivelyDeleteS3Bucket(event.config.bucket)) + .catch(() => recursivelyDeleteS3Bucket(event.config.bucket).then(t.fail)); +}); diff --git a/cumulus/tasks/sf-sns-report/webpack.config.js b/cumulus/tasks/sf-sns-report/webpack.config.js new file mode 100644 index 00000000000..f0d512ee35d --- /dev/null +++ b/cumulus/tasks/sf-sns-report/webpack.config.js @@ -0,0 +1,19 @@ +module.exports = { + entry: ['babel-polyfill', './index.js'], + output: { + libraryTarget: 'commonjs2', + filename: 'dist/index.js' + }, + target: 'node', + devtool: 'sourcemap', + module: { + loaders: [{ + test: /\.js?$/, + exclude: /node_modules(?!\/@cumulus)/, + loader: 'babel' + }, { + test: /\.json$/, + loader: 'json' + }] + } +}; diff --git a/packages/common/aws.js b/packages/common/aws.js index 5477bbd89c7..1e85acecc52 100644 --- a/packages/common/aws.js +++ b/packages/common/aws.js @@ -64,6 +64,7 @@ exports.dynamodbstreams = awsClient(AWS.DynamoDBStreams, '2012-08-10'); exports.dynamodbDocClient = awsClient(AWS.DynamoDB.DocumentClient, '2012-08-10'); exports.sfn = awsClient(AWS.StepFunctions, '2016-11-23'); exports.cf = awsClient(AWS.CloudFormation, '2010-05-15'); +exports.sns = awsClient(AWS.SNS, '2010-03-31'); /** * Describes the resources belonging to a given CloudFormation stack @@ -529,14 +530,15 @@ exports.getGranuleS3Params = (granuleId, stack, bucket) => { /** * Set the status of a granule +* * @name setGranuleStatus -* @param {string} granuleId -* @param {string} stack = the deployment stackname +* @param {string} granuleId - granule id +* @param {string} stack - the deployment stackname * @param {string} bucket - the deployment bucket name -* @param {string} stateMachineArn -* @param {string} executionName -* @param {string} status -* @return {promise} returns the response from `S3.put` as a promise +* @param {string} stateMachineArn - statemachine arn +* @param {string} executionName - execution name +* @param {string} status - granule status +* @returns {Promise} returns the response from `S3.put` as a promise **/ exports.setGranuleStatus = async ( granuleId, @@ -548,5 +550,7 @@ exports.setGranuleStatus = async ( ) => { const key = exports.getGranuleS3Params(granuleId, stack, bucket); const executionArn = exports.getExecutionArn(stateMachineArn, executionName); - await exports.s3().putObject(bucket, key, '', null, { executionArn, status }).promise(); + const params = { Bucket: bucket, Key: key }; + params.Metadata = { executionArn, status }; + await exports.s3().putObject(params).promise(); }; From cd14fd01dc8219502fcd91553ef00c94a2a64ce4 Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Tue, 13 Mar 2018 09:29:28 -0400 Subject: [PATCH 5/9] update readme --- CHANGELOG.md | 2 +- cumulus/tasks/sf-sns-report/README.md | 34 ++++++++++++++++++++-- cumulus/tasks/sf-sns-report/tests/index.js | 21 ++++++++++++- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 515cd22db23..69996ac03dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - **CUMULUS-260: "PDR page on dashboard only shows zeros."** The PDR stats in LPDAAC are all 0s, even if the dashboard has been fixed to retrieve the correct fields. The current version of pdr-status-check has a few issues. - pdr is not included in the input/output schema. It's available from the input event. So the pdr status and stats are not updated when the ParsePdr workflow is complete. Adding the pdr to the input/output of the task will fix this. - - pdr-status-check doesn't update pdr stats which prevent the real time pdr progress from showing up in the dashboard. Added code to update pdr stats in pdr-status-check task. + - pdr-status-check doesn't update pdr stats which prevent the real time pdr progress from showing up in the dashboard. To solve this, added lambda function sf-sns-report which is copied from @cumulus/api/lambdas/sf-sns-broadcase with modification, sf-sns-report can be used to report step function status anywhere inside a step function. So add step sf-sns-report after each pdr-status-check, we will get the PDR status progress at real time. - It's possible a execution is still in the queue and doesn't exist in sfn yet. Added code to handle 'ExecutionDoesNotExist' error when checking the execution status. ### Updated diff --git a/cumulus/tasks/sf-sns-report/README.md b/cumulus/tasks/sf-sns-report/README.md index 76f259532d6..4b8636dce76 100644 --- a/cumulus/tasks/sf-sns-report/README.md +++ b/cumulus/tasks/sf-sns-report/README.md @@ -4,8 +4,23 @@ Broadcast an incoming Cumulus message to SNS. This lambda function works with Cumulus Message Adapter, and it can be used anywhere in a step function workflow to report granule and PDR status. -To report the PDR's progress as it's being processed, add the following step after each pdr-status-check: -` PdrStatusReport: +To report the PDR's progress as it's being processed, add the following step after the pdr-status-check: + + PdrStatusReport: + CumulusConfig: + cumulus_message: + input: '{$}' + outputs: + - source: '{$.payload}' + destination: '{$.payload}' + Type: Task + Resource: ${SfSnsReportLambdaFunction.Arn} + +To report the start status of the step function: + + StartAt: StatusReport + States: + StatusReport: CumulusConfig: cumulus_message: input: '{$}' @@ -14,7 +29,20 @@ To report the PDR's progress as it's being processed, add the following step aft destination: '{$.payload}' Type: Task Resource: ${SfSnsReportLambdaFunction.Arn} -` + +To report the final status of the step function: + + StopStatus: + CumulusConfig: + sfnEnd: true + stack: '{$.meta.stack}' + bucket: '{$.meta.buckets.internal}' + stateMachine: '{$.cumulus_meta.state_machine}' + executionTime: '{$.cumulus_meta.execution_name}' + cumulus_message: + input: '{$}' + Type: Task + Resource: ${SfSnsReportLambdaFunction.Arn} ## What is Cumulus? diff --git a/cumulus/tasks/sf-sns-report/tests/index.js b/cumulus/tasks/sf-sns-report/tests/index.js index 59093ef5e7f..bd0ac7f676f 100644 --- a/cumulus/tasks/sf-sns-report/tests/index.js +++ b/cumulus/tasks/sf-sns-report/tests/index.js @@ -21,7 +21,7 @@ test('send report when sfn is running', (t) => { }); }); -test('send report when sfn is running and lamda function fails', (t) => { +test('send report when sfn is running with exception', (t) => { const event = { input: { meta: { topic_arn: 'test_topic_arn' }, @@ -39,6 +39,25 @@ test('send report when sfn is running and lamda function fails', (t) => { }); }); +test('send report when sfn is running with error', (t) => { + const event = { + input: { + meta: { topic_arn: 'test_topic_arn' }, + error: { + Error: 'TypeError', + Cause: 'bucket not found' + }, + anykey: 'anyvalue' + } + }; + + return publishSnsMessage(cloneDeep(event)) + .catch((e) => { + t.is(e.message, event.input.error.Cause); + }); +}); + + test('send report when sfn is finished and granule succeed', async (t) => { const input = { meta: { From d93903039a5b9820d86de8f8b2e259f8ede81104 Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Tue, 13 Mar 2018 11:46:06 -0400 Subject: [PATCH 6/9] fixes --- CHANGELOG.md | 2 +- cumulus/tasks/pdr-status-check/package.json | 1 - cumulus/tasks/sf-sns-report/README.md | 2 +- cumulus/tasks/sf-sns-report/index.js | 21 ++++++++++----------- cumulus/tasks/sf-sns-report/tests/index.js | 2 +- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60aa4d92a9d..2e6e88949f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - **CUMULUS-260: "PDR page on dashboard only shows zeros."** The PDR stats in LPDAAC are all 0s, even if the dashboard has been fixed to retrieve the correct fields. The current version of pdr-status-check has a few issues. - pdr is not included in the input/output schema. It's available from the input event. So the pdr status and stats are not updated when the ParsePdr workflow is complete. Adding the pdr to the input/output of the task will fix this. - pdr-status-check doesn't update pdr stats which prevent the real time pdr progress from showing up in the dashboard. To solve this, added lambda function sf-sns-report which is copied from @cumulus/api/lambdas/sf-sns-broadcase with modification, sf-sns-report can be used to report step function status anywhere inside a step function. So add step sf-sns-report after each pdr-status-check, we will get the PDR status progress at real time. - - It's possible a execution is still in the queue and doesn't exist in sfn yet. Added code to handle 'ExecutionDoesNotExist' error when checking the execution status. + - It's possible an execution is still in the queue and doesn't exist in sfn yet. Added code to handle 'ExecutionDoesNotExist' error when checking the execution status. ### Updated - Broke up `kes.override.js` of @cumulus/deployment to multiple modules and moved to a new location diff --git a/cumulus/tasks/pdr-status-check/package.json b/cumulus/tasks/pdr-status-check/package.json index 6eb49b94d83..9afd90c5a48 100644 --- a/cumulus/tasks/pdr-status-check/package.json +++ b/cumulus/tasks/pdr-status-check/package.json @@ -40,7 +40,6 @@ ] }, "dependencies": { - "@cumulus/api": "^1.1.1", "@cumulus/common": "^1.1.0", "@cumulus/cumulus-message-adapter-js": "^1.0.1", "@cumulus/ingest": "^1.1.1", diff --git a/cumulus/tasks/sf-sns-report/README.md b/cumulus/tasks/sf-sns-report/README.md index 4b8636dce76..9bed835e460 100644 --- a/cumulus/tasks/sf-sns-report/README.md +++ b/cumulus/tasks/sf-sns-report/README.md @@ -1,4 +1,4 @@ -# @cumulus/queue-pdrs +# @cumulus/sf-sns-report [![CircleCI](https://circleci.com/gh/cumulus-nasa/cumulus.svg?style=svg)](https://circleci.com/gh/cumulus-nasa/cumulus) diff --git a/cumulus/tasks/sf-sns-report/index.js b/cumulus/tasks/sf-sns-report/index.js index 1ada2428d65..bb3ca58d1fb 100644 --- a/cumulus/tasks/sf-sns-report/index.js +++ b/cumulus/tasks/sf-sns-report/index.js @@ -13,13 +13,11 @@ const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); * @returns {boolean} true if there was an exception, false otherwise */ function eventFailed(event) { - if (event.exception) { - if (typeof event.exception === 'object') { - // this is needed to avoid flagging cases like "exception: {}" or "exception: 'none'" - if (Object.keys(event.exception).length > 0) { - return true; - } - } + // event has exception + // and it is needed to avoid flagging cases like "exception: {}" or "exception: 'none'" + if (event.exception && (typeof event.exception === 'object') && + (Object.keys(event.exception).length > 0)) { + return true; } // Error and error keys are not part of the cumulus message // and if they appear in the message something is seriously wrong @@ -31,8 +29,8 @@ function eventFailed(event) { /** * if the cumulus message shows that a previous step failed, - * this function extract the error message from the cumulus message - * and fail the function with that information. This ensures that the + * this function extracts the error message from the cumulus message + * and fails the function with that information. This ensures that the * Step Function workflow fails with the correct error info * * @param {Object} event - aws event object @@ -121,7 +119,8 @@ exports.publishSnsMessage = publishSnsMessage; * @returns {Promise} updated event object */ function handler(event, context, callback) { - return StepFunction.pullEvent(event).then((message) => - cumulusMessageAdapter.runCumulusTask(publishSnsMessage, message, context, callback)); + return StepFunction.pullEvent(event).then((message) => { + cumulusMessageAdapter.runCumulusTask(publishSnsMessage, message, context, callback); + }); } exports.handler = handler; diff --git a/cumulus/tasks/sf-sns-report/tests/index.js b/cumulus/tasks/sf-sns-report/tests/index.js index bd0ac7f676f..94da140047f 100644 --- a/cumulus/tasks/sf-sns-report/tests/index.js +++ b/cumulus/tasks/sf-sns-report/tests/index.js @@ -58,7 +58,7 @@ test('send report when sfn is running with error', (t) => { }); -test('send report when sfn is finished and granule succeed', async (t) => { +test('send report when sfn is finished and granule has succeeded', async (t) => { const input = { meta: { topic_arn: 'test_topic_arn', From 0edd3dadb59f171950dc14a317411be788113da9 Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Wed, 14 Mar 2018 09:33:15 -0400 Subject: [PATCH 7/9] pr updates --- cumulus/tasks/pdr-status-check/schemas/input.json | 7 ++++++- cumulus/tasks/pdr-status-check/schemas/output.json | 7 ++++++- cumulus/tasks/pdr-status-check/tests/index.js | 10 +++++----- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/cumulus/tasks/pdr-status-check/schemas/input.json b/cumulus/tasks/pdr-status-check/schemas/input.json index afdd94941de..7c8d82d1ba9 100644 --- a/cumulus/tasks/pdr-status-check/schemas/input.json +++ b/cumulus/tasks/pdr-status-check/schemas/input.json @@ -2,6 +2,7 @@ "title": "PdrStatusCheckInput", "description": "Describes the input expected by the pdr-status-check task", "type": "object", + "required": ["running", "pdr"], "properties": { "running": { "type": "array", @@ -23,8 +24,12 @@ }, "counter": { "type": "integer" }, "limit": { "type": "integer" }, - "isFinished": { "type": "boolean" }, + "isFinished": { + "description": "Indicates whether all the step function executions of the PDR are in terminal states", + "type": "boolean" + }, "pdr": { + "description": "Product Delivery Record", "type": "object", "required": ["name", "path"], "properties": { diff --git a/cumulus/tasks/pdr-status-check/schemas/output.json b/cumulus/tasks/pdr-status-check/schemas/output.json index df4142e27c7..0a9d4bfd381 100644 --- a/cumulus/tasks/pdr-status-check/schemas/output.json +++ b/cumulus/tasks/pdr-status-check/schemas/output.json @@ -2,6 +2,7 @@ "title": "PdrStatusCheckOutput", "description": "Describes the output produced by the pdr-status-check task", "type": "object", + "required": ["running", "completed", "failed", "isFinished", "pdr"], "properties": { "running": { "type": "array", @@ -23,8 +24,12 @@ }, "counter": { "type": "integer" }, "limit": { "type": "integer" }, - "isFinished": { "type": "boolean" }, + "isFinished": { + "description": "Indicates whether all the step function executions of the PDR are in terminal states", + "type": "boolean" + }, "pdr": { + "description": "Product Delivery Record", "type": "object", "required": ["name", "path"], "properties": { diff --git a/cumulus/tasks/pdr-status-check/tests/index.js b/cumulus/tasks/pdr-status-check/tests/index.js index d926496ef11..4c8edc8f416 100644 --- a/cumulus/tasks/pdr-status-check/tests/index.js +++ b/cumulus/tasks/pdr-status-check/tests/index.js @@ -67,15 +67,15 @@ test('returns the correct results in the nominal case', (t) => { 'arn:4': 'ABORTED', 'arn:7': null }; - const error = { - message: 'Execution Does Not Exist: arn', - code: 'ExecutionDoesNotExist' - }; const stubSfnClient = { describeExecution: ({ executionArn }) => ({ promise: () => { - if (executionStatuses[executionArn] === null) return Promise.reject(error); + if (!executionStatuses[executionArn]) { + const error = new Error(`Execution does not exist: ${executionArn}`); + error.code = 'ExecutionDoesNotExist'; + return Promise.reject(error); + } return Promise.resolve({ executionArn, status: executionStatuses[executionArn] From b2507ce25330312499b5e3687e3db3549e894d96 Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Wed, 14 Mar 2018 22:55:55 -0400 Subject: [PATCH 8/9] more fixes --- CHANGELOG.md | 2 +- cumulus/tasks/sf-sns-report/README.md | 9 +-- cumulus/tasks/sf-sns-report/index.js | 73 ++++++++++--------- .../tasks/sf-sns-report/schemas/config.json | 10 ++- cumulus/tasks/sf-sns-report/tests/index.js | 29 ++++++-- 5 files changed, 74 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1964f4686f8..d28ed7cdae7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - **CUMULUS-260: "PDR page on dashboard only shows zeros."** The PDR stats in LPDAAC are all 0s, even if the dashboard has been fixed to retrieve the correct fields. The current version of pdr-status-check has a few issues. - pdr is not included in the input/output schema. It's available from the input event. So the pdr status and stats are not updated when the ParsePdr workflow is complete. Adding the pdr to the input/output of the task will fix this. - - pdr-status-check doesn't update pdr stats which prevent the real time pdr progress from showing up in the dashboard. To solve this, added lambda function sf-sns-report which is copied from @cumulus/api/lambdas/sf-sns-broadcase with modification, sf-sns-report can be used to report step function status anywhere inside a step function. So add step sf-sns-report after each pdr-status-check, we will get the PDR status progress at real time. + - pdr-status-check doesn't update pdr stats which prevent the real time pdr progress from showing up in the dashboard. To solve this, added lambda function sf-sns-report which is copied from @cumulus/api/lambdas/sf-sns-broadcast with modification, sf-sns-report can be used to report step function status anywhere inside a step function. So add step sf-sns-report after each pdr-status-check, we will get the PDR status progress at real time. - It's possible an execution is still in the queue and doesn't exist in sfn yet. Added code to handle 'ExecutionDoesNotExist' error when checking the execution status. ### Updated diff --git a/cumulus/tasks/sf-sns-report/README.md b/cumulus/tasks/sf-sns-report/README.md index 9bed835e460..bb0d882e777 100644 --- a/cumulus/tasks/sf-sns-report/README.md +++ b/cumulus/tasks/sf-sns-report/README.md @@ -10,9 +10,7 @@ To report the PDR's progress as it's being processed, add the following step aft CumulusConfig: cumulus_message: input: '{$}' - outputs: - - source: '{$.payload}' - destination: '{$.payload}' + ResultPath: null Type: Task Resource: ${SfSnsReportLambdaFunction.Arn} @@ -24,9 +22,7 @@ To report the start status of the step function: CumulusConfig: cumulus_message: input: '{$}' - outputs: - - source: '{$.payload}' - destination: '{$.payload}' + ResultPath: null Type: Task Resource: ${SfSnsReportLambdaFunction.Arn} @@ -41,6 +37,7 @@ To report the final status of the step function: executionTime: '{$.cumulus_meta.execution_name}' cumulus_message: input: '{$}' + ResultPath: null Type: Task Resource: ${SfSnsReportLambdaFunction.Arn} diff --git a/cumulus/tasks/sf-sns-report/index.js b/cumulus/tasks/sf-sns-report/index.js index bb3ca58d1fb..772e163f26f 100644 --- a/cumulus/tasks/sf-sns-report/index.js +++ b/cumulus/tasks/sf-sns-report/index.js @@ -1,7 +1,6 @@ 'use strict'; const get = require('lodash.get'); -const { StepFunction } = require('@cumulus/ingest/aws'); const { setGranuleStatus, sns } = require('@cumulus/common/aws'); const errors = require('@cumulus/common/errors'); const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); @@ -15,20 +14,35 @@ const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); function eventFailed(event) { // event has exception // and it is needed to avoid flagging cases like "exception: {}" or "exception: 'none'" - if (event.exception && (typeof event.exception === 'object') && - (Object.keys(event.exception).length > 0)) { - return true; - } + if ((event.exception) && (typeof event.exception === 'object') && + (Object.keys(event.exception).length > 0)) return true; + // Error and error keys are not part of the cumulus message // and if they appear in the message something is seriously wrong - else if (event.Error || event.error) { - return true; - } + else if (event.Error || event.error) return true; + return false; } /** - * if the cumulus message shows that a previous step failed, + * Builds error object based on error type + * + * @param {string} type - error type + * @param {string} cause - error cause + * @returns {Object} the error object + */ +function buildError(type, cause) { + let ErrorClass; + + if (Object.keys(errors).includes(type)) ErrorClass = errors[type]; + else if (type === 'TypeError') ErrorClass = TypeError; + else ErrorClass = Error; + + return new ErrorClass(cause); +} + +/** + * If the cumulus message shows that a previous step failed, * this function extracts the error message from the cumulus message * and fails the function with that information. This ensures that the * Step Function workflow fails with the correct error info @@ -37,17 +51,9 @@ function eventFailed(event) { * @returns {undefined} throws an error and does not return anything */ function makeLambdaFunctionFail(event) { - const error = get(event, 'exception.Error', get(event, 'error.Error')); - const cause = get(event, 'exception.Cause', get(event, 'error.Cause')); - if (error) { - if (errors[error]) { - throw new errors[error](cause); - } - else if (error === 'TypeError') { - throw new TypeError(cause); - } - throw new Error(cause); - } + const error = event.exception || event.error; + + if (error) throw buildError(error.Error, error.Cause); throw new Error('Step Function failed for an unknown reason.'); } @@ -64,17 +70,18 @@ function makeLambdaFunctionFail(event) { * @param {string} event.config.bucket - S3 bucket * @param {string} event.config.stateMachine - current state machine * @param {string} event.config.executionTime - execution time - * @returns {Promise.} - AWS SNS response. see schemas/output.json for detailed output - * schema that is passed to the next task in the workflow + * @returns {Promise.} - AWS SNS response or error in case of step function + * failure. */ async function publishSnsMessage(event) { - const config = get(event, 'config', []); - const message = get(event, 'input', []); + const config = get(event, 'config'); + const message = get(event, 'input'); const finished = get(config, 'sfnEnd', false); const topicArn = get(message, 'meta.topic_arn', null); const failed = eventFailed(message); + let response = {}; if (topicArn) { // if this is the sns call at the end of the execution if (finished) { @@ -83,10 +90,10 @@ async function publishSnsMessage(event) { if (granuleId) { await setGranuleStatus( granuleId, - get(config, 'stack', null), - get(config, 'bucket', null), - get(config, 'stateMachine', null), - get(config, 'executionName', null), + get(config, 'stack'), + get(config, 'bucket'), + get(config, 'stateMachine'), + get(config, 'executionName'), message.meta.status ); } @@ -95,7 +102,7 @@ async function publishSnsMessage(event) { message.meta.status = 'running'; } - await sns().publish({ + response = await sns().publish({ TopicArn: topicArn, Message: JSON.stringify(message) }).promise(); @@ -105,7 +112,7 @@ async function publishSnsMessage(event) { makeLambdaFunctionFail(message); } - return message; + return response; } exports.publishSnsMessage = publishSnsMessage; @@ -116,11 +123,9 @@ exports.publishSnsMessage = publishSnsMessage; * @param {Object} event - a Cumulus Message * @param {Object} context - an AWS Lambda context object * @param {Function} callback - an AWS Lambda call back - * @returns {Promise} updated event object + * @returns {undefined} - does not return a value */ function handler(event, context, callback) { - return StepFunction.pullEvent(event).then((message) => { - cumulusMessageAdapter.runCumulusTask(publishSnsMessage, message, context, callback); - }); + cumulusMessageAdapter.runCumulusTask(publishSnsMessage, event, context, callback); } exports.handler = handler; diff --git a/cumulus/tasks/sf-sns-report/schemas/config.json b/cumulus/tasks/sf-sns-report/schemas/config.json index 7ab81fe80ea..fc5e02565e7 100644 --- a/cumulus/tasks/sf-sns-report/schemas/config.json +++ b/cumulus/tasks/sf-sns-report/schemas/config.json @@ -24,5 +24,13 @@ "description": "execution time (from cumulus_meta.execution_name). Required when sfnEnd is true and granule status is reported.", "type": "string" } - } + }, + "oneOf": [ + { + "required": ["stack", "bucket", "stateMachine", "executionTime"] + }, + { + "required": [] + } + ] } diff --git a/cumulus/tasks/sf-sns-report/tests/index.js b/cumulus/tasks/sf-sns-report/tests/index.js index 94da140047f..d34c7282b0a 100644 --- a/cumulus/tasks/sf-sns-report/tests/index.js +++ b/cumulus/tasks/sf-sns-report/tests/index.js @@ -3,7 +3,7 @@ const test = require('ava'); const { recursivelyDeleteS3Bucket, s3 } = require('@cumulus/common/aws'); const { publishSnsMessage } = require('../index'); -const { cloneDeep } = require('lodash'); +const { cloneDeep, get } = require('lodash'); const { randomString } = require('@cumulus/common/test-utils'); test('send report when sfn is running', (t) => { @@ -16,8 +16,7 @@ test('send report when sfn is running', (t) => { return publishSnsMessage(cloneDeep(event)) .then((output) => { - event.input.meta.status = 'running'; - t.deepEqual(output, event.input); + t.not(get(output, 'MessageId', null)); }); }); @@ -39,13 +38,13 @@ test('send report when sfn is running with exception', (t) => { }); }); -test('send report when sfn is running with error', (t) => { +test('send report when sfn is running with TypeError', (t) => { const event = { input: { meta: { topic_arn: 'test_topic_arn' }, error: { Error: 'TypeError', - Cause: 'bucket not found' + Cause: 'resource not found' }, anykey: 'anyvalue' } @@ -57,6 +56,23 @@ test('send report when sfn is running with error', (t) => { }); }); +test('send report when sfn is running with known error type', (t) => { + const event = { + input: { + meta: { topic_arn: 'test_topic_arn' }, + error: { + Error: 'PDRParsingError', + Cause: 'format error' + }, + anykey: 'anyvalue' + } + }; + + return publishSnsMessage(cloneDeep(event)) + .catch((e) => { + t.is(e.message, event.input.error.Cause); + }); +}); test('send report when sfn is finished and granule has succeeded', async (t) => { const input = { @@ -79,8 +95,7 @@ test('send report when sfn is finished and granule has succeeded', async (t) => await s3().createBucket({ Bucket: event.config.bucket }).promise(); return publishSnsMessage(cloneDeep(event)) .then((output) => { - event.input.meta.status = 'completed'; - t.deepEqual(output, event.input); + t.not(get(output, 'MessageId', null)); }) .then(() => recursivelyDeleteS3Bucket(event.config.bucket)) .catch(() => recursivelyDeleteS3Bucket(event.config.bucket).then(t.fail)); From a9c446d45a92e2026c418a1adb4e2d0a7ecf8d7a Mon Sep 17 00:00:00 2001 From: Jenny Liu Date: Thu, 15 Mar 2018 09:36:30 -0400 Subject: [PATCH 9/9] more fix --- cumulus/tasks/sf-sns-report/README.md | 2 +- cumulus/tasks/sf-sns-report/index.js | 10 +++++----- cumulus/tasks/sf-sns-report/schemas/config.json | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cumulus/tasks/sf-sns-report/README.md b/cumulus/tasks/sf-sns-report/README.md index bb0d882e777..3461c4a83bf 100644 --- a/cumulus/tasks/sf-sns-report/README.md +++ b/cumulus/tasks/sf-sns-report/README.md @@ -34,7 +34,7 @@ To report the final status of the step function: stack: '{$.meta.stack}' bucket: '{$.meta.buckets.internal}' stateMachine: '{$.cumulus_meta.state_machine}' - executionTime: '{$.cumulus_meta.execution_name}' + executionName: '{$.cumulus_meta.execution_name}' cumulus_message: input: '{$}' ResultPath: null diff --git a/cumulus/tasks/sf-sns-report/index.js b/cumulus/tasks/sf-sns-report/index.js index 772e163f26f..c30341e1102 100644 --- a/cumulus/tasks/sf-sns-report/index.js +++ b/cumulus/tasks/sf-sns-report/index.js @@ -69,7 +69,7 @@ function makeLambdaFunctionFail(event) { * @param {string} event.config.stack - the name of the deployment stack * @param {string} event.config.bucket - S3 bucket * @param {string} event.config.stateMachine - current state machine - * @param {string} event.config.executionTime - execution time + * @param {string} event.config.executionName - execution name * @returns {Promise.} - AWS SNS response or error in case of step function * failure. */ @@ -90,10 +90,10 @@ async function publishSnsMessage(event) { if (granuleId) { await setGranuleStatus( granuleId, - get(config, 'stack'), - get(config, 'bucket'), - get(config, 'stateMachine'), - get(config, 'executionName'), + config.stack, + config.bucket, + config.stateMachine, + config.executionName, message.meta.status ); } diff --git a/cumulus/tasks/sf-sns-report/schemas/config.json b/cumulus/tasks/sf-sns-report/schemas/config.json index fc5e02565e7..588d3df1dae 100644 --- a/cumulus/tasks/sf-sns-report/schemas/config.json +++ b/cumulus/tasks/sf-sns-report/schemas/config.json @@ -20,14 +20,14 @@ "description": "current state machine (from cumulus_meta.state_machine). Required when sfnEnd is true and granule status is reported.", "type": "string" }, - "executionTime": { - "description": "execution time (from cumulus_meta.execution_name). Required when sfnEnd is true and granule status is reported.", + "executionName": { + "description": "execution name (from cumulus_meta.execution_name). Required when sfnEnd is true and granule status is reported.", "type": "string" } }, "oneOf": [ { - "required": ["stack", "bucket", "stateMachine", "executionTime"] + "required": ["stack", "bucket", "stateMachine", "executionName"] }, { "required": []