-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CUMULUS-304: Add AWS API throttling to pdr-status-check task #256
Changes from 5 commits
e67ef8d
2a8ded9
e909624
d50d17c
106489c
d27f418
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ const aws = require('@cumulus/common/aws'); | |
const { IncompleteError } = require('@cumulus/common/errors'); | ||
const log = require('@cumulus/common/log'); | ||
const { justLocalRun } = require('@cumulus/common/local-helpers'); | ||
const pLimit = require('p-limit'); | ||
|
||
// The default number of times to re-check for completion | ||
const defaultRetryLimit = 30; | ||
|
@@ -130,6 +131,21 @@ function buildOutput(event, groupedExecutions) { | |
return output; | ||
} | ||
|
||
/** | ||
* check the status of a step funciton execution | ||
* | ||
* @param {string} executionArn - step function execution arn | ||
* @returns {Promise.<Object>} - an object describing the status of the exection | ||
*/ | ||
function describeExecutionStatus(executionArn) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this matters but this is kind of duplicating functionality we have in https://github.com/cumulus-nasa/cumulus/blob/master/packages/api/endpoints/execution-status.js - we could think about putting this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But it might be we want different enough behavior that keeping them separate makes sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks similar, but combining them doesn't seem save much code. |
||
return aws.sfn().describeExecution({ executionArn }).promise() | ||
.catch((e) => { | ||
if (e.code === 'ExecutionDoesNotExist') { | ||
return { executionArn: executionArn, status: 'RUNNING' }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain this to me? If we get an error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The granule stats in the pdr have the following lists: running, completed, failed. Initially, the PDR queues these granules and puts the executions to running list. So, the executions which haven't been executed are still belong to running list. |
||
} | ||
throw e; | ||
}); | ||
} | ||
/** | ||
* Checks a list of Step Function Executions to see if they are all in | ||
* terminal states. | ||
|
@@ -141,34 +157,28 @@ function buildOutput(event, groupedExecutions) { | |
*/ | ||
async function checkPdrStatuses(event) { | ||
const runningExecutionArns = event.input.running || []; | ||
const concurrencyLimit = process.env.CONCURRENCY || 10; | ||
const limit = pLimit(concurrencyLimit); | ||
|
||
const executions = []; | ||
for (const executionArn of runningExecutionArns) { | ||
try { | ||
const execution = await aws.sfn().describeExecution({ executionArn }).promise(); | ||
executions.push(execution); | ||
} | ||
catch (e) { | ||
// it's ok if a execution is still in the queue and has not be executed | ||
if (e.code === 'ExecutionDoesNotExist') { | ||
executions.push({ executionArn: executionArn, status: 'RUNNING' }); | ||
} | ||
else throw e; | ||
} | ||
} | ||
const promisedExecutionDescriptions = runningExecutionArns.map((executionArn) => | ||
limit(() => describeExecutionStatus(executionArn))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is my understanding correct that this limits concurrency to 10 such that if there are 100 running executions, it would only fetch 10 statuses at a time? Related question - is there any retry functionality built into this? Like if one of the connections fails we retry a request for the execution arn? Not required for this fix but just curious. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, only 10 sfn.DescribeExecution can be run at a time. |
||
|
||
const groupedExecutions = groupExecutionsByStatus(executions); | ||
const counter = getCounterFromEvent(event) + 1; | ||
const exceededLimit = counter >= getLimitFromEvent(event); | ||
return Promise.all(promisedExecutionDescriptions) | ||
.then(groupExecutionsByStatus) | ||
.then((groupedExecutions) => { | ||
const counter = getCounterFromEvent(event) + 1; | ||
const exceededLimit = counter >= getLimitFromEvent(event); | ||
|
||
const executionsAllDone = groupedExecutions.running.length === 0; | ||
if (!executionsAllDone && exceededLimit) { | ||
throw new IncompleteError(`PDR didn't complete after ${counter} checks`); | ||
} | ||
const executionsAllDone = groupedExecutions.running.length === 0; | ||
|
||
const output = buildOutput(event, groupedExecutions); | ||
if (!output.isFinished) logStatus(output); | ||
return output; | ||
if (!executionsAllDone && exceededLimit) { | ||
throw new IncompleteError(`PDR didn't complete after ${counter} checks`); | ||
} | ||
|
||
const output = buildOutput(event, groupedExecutions); | ||
if (!output.isFinished) logStatus(output); | ||
return output; | ||
}); | ||
} | ||
exports.checkPdrStatuses = checkPdrStatuses; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,11 @@ | |
const _ = require('lodash'); | ||
const test = require('ava'); | ||
const sinon = require('sinon'); | ||
const delay = require('delay'); | ||
const moment = require('moment'); | ||
const aws = require('@cumulus/common/aws'); | ||
const { checkPdrStatuses } = require('../index'); | ||
const { randomString } = require('@cumulus/common/test-utils'); | ||
|
||
test('valid output when no running executions', (t) => { | ||
const event = { | ||
|
@@ -122,3 +125,40 @@ test('returns the correct results in the nominal case', (t) => { | |
}); | ||
}); | ||
}); | ||
|
||
test('test concurrency limit setting on sfn api calls', (t) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice test! |
||
_.set(process, 'env.CONCURRENCY', 20); | ||
const stubSfnClient = { | ||
describeExecution: ({ executionArn }) => ({ | ||
promise: () => delay(100) | ||
.then(() => Promise.resolve({ executionArn, status: 'SUCCEEDED' })) | ||
}) | ||
}; | ||
const stub = sinon.stub(aws, 'sfn').returns(stubSfnClient); | ||
|
||
const running = []; | ||
const uuid = randomString(); | ||
for (let i = 0; i < 200; i++) running[i] = `${uuid}:${i}`; | ||
|
||
const event = { | ||
input: { | ||
running, | ||
pdr: { name: 'test.PDR', path: 'test-path' } | ||
} | ||
}; | ||
const startTime = moment(); | ||
return checkPdrStatuses(event) | ||
.then((output) => { | ||
stub.restore(); | ||
|
||
t.true(output.isFinished); | ||
|
||
// the sf api execution time would be approximately: | ||
// ((# of executions)/(# of concurrency) ) * (function execution time) | ||
// (200/20) * 100 = 1000 | ||
// add 100ms for other operations | ||
const endTime = moment(); | ||
const timeEscaped = moment.duration(endTime.diff(startTime)).as('milliseconds'); | ||
t.true(timeEscaped >= 900 && timeEscaped <= 1100); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is part of the 1.2.0 release: https://github.com/cumulus-nasa/cumulus/pull/256/files#diff-4ac32a78649ca5bdd8e0ba38b7006a1eR21
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed duplicate entry.