diff --git a/CHANGELOG.md b/CHANGELOG.md index 070a1566bdf..92bef90e08a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- **CUMULUS-304: "Add AWS API throttling to pdr-status-check task"** Added concurrency limit on SFN API calls. The default concurrency is 10 and is configurable through Lambda environment variable CONCURRENCY. ## [v1.2.0] - 2018-03-20 diff --git a/cumulus/tasks/pdr-status-check/README.md b/cumulus/tasks/pdr-status-check/README.md index 3f31041f437..273d629d841 100644 --- a/cumulus/tasks/pdr-status-check/README.md +++ b/cumulus/tasks/pdr-status-check/README.md @@ -3,6 +3,7 @@ [![CircleCI](https://circleci.com/gh/cumulus-nasa/cumulus.svg?style=svg)](https://circleci.com/gh/cumulus-nasa/cumulus) Lambda function handler for checking the status of a workflow (step function) execution. Expects a payload object which includes the name of a PDR. +The concurrency of SFN API calls is set to 10 by default, and it's configurable by setting the Lambda environment variable CONCURRENCY. ## What is Cumulus? diff --git a/cumulus/tasks/pdr-status-check/index.js b/cumulus/tasks/pdr-status-check/index.js index 74847119843..d9eea082247 100644 --- a/cumulus/tasks/pdr-status-check/index.js +++ b/cumulus/tasks/pdr-status-check/index.js @@ -5,6 +5,7 @@ const aws = require('@cumulus/common/aws'); const { IncompleteError } = require('@cumulus/common/errors'); const log = require('@cumulus/common/log'); const { justLocalRun } = require('@cumulus/common/local-helpers'); +const pLimit = require('p-limit'); // The default number of times to re-check for completion const defaultRetryLimit = 30; @@ -130,6 +131,21 @@ function buildOutput(event, groupedExecutions) { return output; } +/** + * check the status of a step funciton execution + * + * @param {string} executionArn - step function execution arn + * @returns {Promise.} - an object describing the status of the exection + */ +function describeExecutionStatus(executionArn) { + return aws.sfn().describeExecution({ executionArn }).promise() + .catch((e) => { + if (e.code === 'ExecutionDoesNotExist') { + return { executionArn: executionArn, status: 'RUNNING' }; + } + throw e; + }); +} /** * Checks a list of Step Function Executions to see if they are all in * terminal states. @@ -141,34 +157,28 @@ function buildOutput(event, groupedExecutions) { */ async function checkPdrStatuses(event) { const runningExecutionArns = event.input.running || []; + const concurrencyLimit = process.env.CONCURRENCY || 10; + const limit = pLimit(concurrencyLimit); - const executions = []; - for (const executionArn of runningExecutionArns) { - try { - const execution = await aws.sfn().describeExecution({ executionArn }).promise(); - executions.push(execution); - } - catch (e) { - // it's ok if a execution is still in the queue and has not be executed - if (e.code === 'ExecutionDoesNotExist') { - executions.push({ executionArn: executionArn, status: 'RUNNING' }); - } - else throw e; - } - } + const promisedExecutionDescriptions = runningExecutionArns.map((executionArn) => + limit(() => describeExecutionStatus(executionArn))); - const groupedExecutions = groupExecutionsByStatus(executions); - const counter = getCounterFromEvent(event) + 1; - const exceededLimit = counter >= getLimitFromEvent(event); + return Promise.all(promisedExecutionDescriptions) + .then(groupExecutionsByStatus) + .then((groupedExecutions) => { + const counter = getCounterFromEvent(event) + 1; + const exceededLimit = counter >= getLimitFromEvent(event); - const executionsAllDone = groupedExecutions.running.length === 0; - if (!executionsAllDone && exceededLimit) { - throw new IncompleteError(`PDR didn't complete after ${counter} checks`); - } + const executionsAllDone = groupedExecutions.running.length === 0; - const output = buildOutput(event, groupedExecutions); - if (!output.isFinished) logStatus(output); - return output; + if (!executionsAllDone && exceededLimit) { + throw new IncompleteError(`PDR didn't complete after ${counter} checks`); + } + + const output = buildOutput(event, groupedExecutions); + if (!output.isFinished) logStatus(output); + return output; + }); } exports.checkPdrStatuses = checkPdrStatuses; diff --git a/cumulus/tasks/pdr-status-check/package.json b/cumulus/tasks/pdr-status-check/package.json index bace424943f..74aa7aa4e83 100644 --- a/cumulus/tasks/pdr-status-check/package.json +++ b/cumulus/tasks/pdr-status-check/package.json @@ -52,13 +52,16 @@ "babel-preset-es2017": "^6.24.1", "json-loader": "^0.5.7", "lodash.get": "^4.4.2", + "p-limit": "^1.1.0", "webpack": "^1.12.13" }, "devDependencies": { "@ava/babel-preset-stage-4": "^1.1.0", "@ava/babel-preset-transform-test-files": "^3.0.0", "ava": "^0.23.0", + "delay": "^2.0.0", "lodash": "^4.17.5", + "moment": "^2.21.0", "proxyquire": "^1.8.0", "sinon": "^2.0.0-pre.5" } diff --git a/cumulus/tasks/pdr-status-check/tests/index.js b/cumulus/tasks/pdr-status-check/tests/index.js index 4c8edc8f416..c4cc285e423 100644 --- a/cumulus/tasks/pdr-status-check/tests/index.js +++ b/cumulus/tasks/pdr-status-check/tests/index.js @@ -3,8 +3,11 @@ const _ = require('lodash'); const test = require('ava'); const sinon = require('sinon'); +const delay = require('delay'); +const moment = require('moment'); const aws = require('@cumulus/common/aws'); const { checkPdrStatuses } = require('../index'); +const { randomString } = require('@cumulus/common/test-utils'); test('valid output when no running executions', (t) => { const event = { @@ -122,3 +125,40 @@ test('returns the correct results in the nominal case', (t) => { }); }); }); + +test('test concurrency limit setting on sfn api calls', (t) => { + _.set(process, 'env.CONCURRENCY', 20); + const stubSfnClient = { + describeExecution: ({ executionArn }) => ({ + promise: () => delay(100) + .then(() => Promise.resolve({ executionArn, status: 'SUCCEEDED' })) + }) + }; + const stub = sinon.stub(aws, 'sfn').returns(stubSfnClient); + + const running = []; + const uuid = randomString(); + for (let i = 0; i < 200; i++) running[i] = `${uuid}:${i}`; + + const event = { + input: { + running, + pdr: { name: 'test.PDR', path: 'test-path' } + } + }; + const startTime = moment(); + return checkPdrStatuses(event) + .then((output) => { + stub.restore(); + + t.true(output.isFinished); + + // the sf api execution time would be approximately: + // ((# of executions)/(# of concurrency) ) * (function execution time) + // (200/20) * 100 = 1000 + // add 100ms for other operations + const endTime = moment(); + const timeEscaped = moment.duration(endTime.diff(startTime)).as('milliseconds'); + t.true(timeEscaped >= 900 && timeEscaped <= 1100); + }); +});