-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update queue-pdrs task to use the message adapter #228
Changes from 2 commits
c1e57db
f683bea
dbdcdbc
5127320
85a3810
83b8e41
e7a9bc8
8f1780c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,42 +1,34 @@ | ||
'use strict'; | ||
|
||
import { queuePdr } from '@cumulus/ingest/queue'; | ||
const log = require('@cumulus/common/log'); | ||
const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); | ||
const { enqueueParsePdrMessage } = require('@cumulus/ingest/queue'); | ||
|
||
/** | ||
* Callback function provided by aws lambda. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-handler.html#nodejs-prog-model-handler-callback | ||
* @callback lambdaCallback | ||
* @param {object} error | ||
* @param {object} output - output object matching schemas/output.json | ||
* @param {integer} output.pdrs_queued | ||
*/ | ||
|
||
/** | ||
* For each PDR, generate a new SF messages send to the step function queue to be executed | ||
* @param {object} event lambda event object | ||
* @param {object} event.input | ||
* @param {array} event.input.pdrs | ||
* @param {object} context Lambda context object. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html | ||
* @param {lambdaCallback} callback callback function | ||
* @return {undefined} | ||
**/ | ||
function handler(event, context, cb) { | ||
async function queuePdrs(event) { | ||
const pdrs = event.input.pdrs || []; | ||
const config = event.config; | ||
const queuedPdrs = pdrs.map((pdr) => queuePdr( | ||
config.queueUrl, | ||
config.templateUri, | ||
config.provider, | ||
config.collection, | ||
pdr | ||
)); | ||
|
||
return Promise.all(queuedPdrs).then(() => { | ||
cb(null, { pdrs_queued: queuedPdrs.length }); | ||
}).catch(e => { | ||
log.error(e); | ||
return cb(e); | ||
}); | ||
await Promise.all( | ||
pdrs.map((pdr) => enqueueParsePdrMessage( | ||
pdr, | ||
event.config.queueUrl, | ||
event.config.parsePdrMessageTemplateUri, | ||
event.config.provider, | ||
event.config.collection | ||
)) | ||
); | ||
|
||
return { pdrs_queued: pdrs.length }; | ||
} | ||
exports.queuePdrs = queuePdrs; | ||
|
||
module.exports.handler = handler; | ||
/** | ||
* Lambda handler | ||
* | ||
* @param {Object} event - a Cumulus Message | ||
* @param {Object} context - an AWS Lambda context | ||
* @param {Function} callback - an AWS Lambda handler | ||
* @returns {undefined} - does not return a value | ||
*/ | ||
function handler(event, context, callback) { | ||
cumulusMessageAdapter.runCumulusTask(queuePdrs, event, context, callback); | ||
} | ||
exports.handler = handler; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
"license": "Apache-2.0", | ||
"dependencies": { | ||
"@cumulus/common": "^1.0.1", | ||
"@cumulus/cumulus-message-adapter-js": "0.0.1-beta.3", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably have this on 1.0.0 now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
"@cumulus/ingest": "^1.0.1", | ||
"babel-core": "^6.25.0", | ||
"babel-loader": "^6.2.4", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"title": "QueuePdrsConfig", | ||
"description": "Describes the config used by the queue-pdrs task", | ||
"type": "object", | ||
"required": [ | ||
"collection", | ||
"provider", | ||
"queueUrl", | ||
"parsePdrMessageTemplateUri" | ||
], | ||
"additionalProperties": false, | ||
"properties": { | ||
"collection": { "type": "object" }, | ||
"provider": { "type": "object" }, | ||
"queueUrl": { "type": "string" }, | ||
"parsePdrMessageTemplateUri": { "type": "string" } | ||
} | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"rules": { | ||
"no-param-reassign": "off" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,53 +1,170 @@ | ||
/* eslint-disable no-param-reassign */ | ||
'use strict'; | ||
|
||
const test = require('ava'); | ||
const MockAWS = require('@mapbox/mock-aws-sdk-js'); | ||
|
||
const { s3, sqs, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); | ||
const { createQueue, randomString } = require('@cumulus/common/test-utils'); | ||
const { | ||
createQueue, | ||
randomString, | ||
validateConfig, | ||
validateInput, | ||
validateOutput | ||
} = require('@cumulus/common/test-utils'); | ||
|
||
const { handler } = require('../index'); | ||
const inputJSON = require('./fixtures/input.json'); | ||
const workflowTemplate = require('./fixtures/workflow-template.json'); | ||
|
||
const aws = require('@cumulus/common/aws'); | ||
const { queuePdrs } = require('../index'); | ||
|
||
test.beforeEach(async (t) => { | ||
t.context.queueUrl = await createQueue(); | ||
t.context.templateBucket = randomString(); | ||
await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); | ||
|
||
t.context.stateMachineArn = randomString(); | ||
|
||
t.context.messageTemplate = { | ||
cumulus_meta: { | ||
state_machine: t.context.stateMachineArn | ||
}, | ||
meta: {} | ||
}; | ||
const messageTemplateKey = `${randomString()}/template.json`; | ||
await s3().putObject({ | ||
Bucket: t.context.templateBucket, | ||
Key: messageTemplateKey, | ||
Body: JSON.stringify(t.context.messageTemplate) | ||
}).promise(); | ||
|
||
t.context.event = { | ||
config: { | ||
collection: { name: 'collection-name' }, | ||
provider: { name: 'provider-name' }, | ||
queueUrl: await createQueue(), | ||
parsePdrMessageTemplateUri: `s3://${t.context.templateBucket}/${messageTemplateKey}` | ||
}, | ||
input: { | ||
pdrs: [] | ||
} | ||
}; | ||
}); | ||
|
||
test.afterEach(async (t) => { | ||
await Promise.all([ | ||
recursivelyDeleteS3Bucket(t.context.templateBucket), | ||
sqs().deleteQueue({ QueueUrl: t.context.event.config.queueUrl }).promise() | ||
]); | ||
}); | ||
|
||
test('The correct output is returned when PDRs are queued', async (t) => { | ||
const event = t.context.event; | ||
event.input.pdrs = [ | ||
{ name: randomString(), path: randomString() }, | ||
{ name: randomString(), path: randomString() } | ||
]; | ||
|
||
await validateConfig(t, event.config); | ||
await validateInput(t, event.input); | ||
|
||
const output = await queuePdrs(event); | ||
|
||
await validateOutput(t, output); | ||
t.deepEqual(output, { pdrs_queued: 2 }); | ||
}); | ||
|
||
test('The correct output is returned when no PDRs are queued', async (t) => { | ||
const event = t.context.event; | ||
event.input.pdrs = []; | ||
|
||
await validateConfig(t, event.config); | ||
await validateInput(t, event.input); | ||
|
||
t.context.bucket = randomString(); | ||
return s3().createBucket({ Bucket: t.context.bucket }).promise(); | ||
const output = await queuePdrs(event); | ||
|
||
await validateOutput(t, output); | ||
t.deepEqual(output, { pdrs_queued: 0 }); | ||
}); | ||
|
||
test.afterEach.always((t) => | ||
Promise.all([ | ||
recursivelyDeleteS3Bucket(t.context.bucket), | ||
sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise() | ||
])); | ||
test('PDRs are added to the queue', async (t) => { | ||
const event = t.context.event; | ||
event.input.pdrs = [ | ||
{ name: randomString(), path: randomString() }, | ||
{ name: randomString(), path: randomString() } | ||
]; | ||
|
||
await validateConfig(t, event.config); | ||
await validateInput(t, event.input); | ||
|
||
test('queue pdrs', async (t) => { | ||
const Bucket = t.context.bucket; | ||
const ParsePdrTemplate = `s3://${Bucket}/dev/workflows/ParsePdr.json`; | ||
const output = await queuePdrs(event); | ||
|
||
await aws.s3().putObject({ | ||
Bucket, | ||
Key: 'dev/workflows/ParsePdr.json', | ||
Body: JSON.stringify(workflowTemplate) | ||
await validateOutput(t, output); | ||
|
||
// Get messages from the queue | ||
const receiveMessageResponse = await sqs().receiveMessage({ | ||
QueueUrl: t.context.event.config.queueUrl, | ||
MaxNumberOfMessages: 10, | ||
WaitTimeSeconds: 1 | ||
}).promise(); | ||
const messages = receiveMessageResponse.Messages; | ||
|
||
MockAWS.stub('StepFunctions', 'describeExecution').returns({ | ||
promise: () => Promise.resolve({}) | ||
}); | ||
t.is(messages.length, 2); | ||
}); | ||
|
||
test('The correct message is enqueued', async (t) => { | ||
const event = t.context.event; | ||
event.input.pdrs = [ | ||
{ | ||
name: randomString(), | ||
path: randomString() | ||
}, | ||
{ | ||
name: randomString(), | ||
path: randomString() | ||
}, | ||
]; | ||
|
||
await validateConfig(t, event.config); | ||
await validateInput(t, event.input); | ||
|
||
const output = await queuePdrs(event); | ||
|
||
const input = Object.assign({}, inputJSON); | ||
input.config.templateUri = ParsePdrTemplate; | ||
input.config.queueUrl = t.context.queueUrl; | ||
await validateOutput(t, output); | ||
|
||
return handler(input, {}, (e, output) => { | ||
t.ifError(e); | ||
t.is(typeof output, 'object'); | ||
t.is(output.pdrs_queued, 2); | ||
MockAWS.StepFunctions.restore(); | ||
// Get messages from the queue | ||
const receiveMessageResponse = await sqs().receiveMessage({ | ||
QueueUrl: t.context.event.config.queueUrl, | ||
MaxNumberOfMessages: 10, | ||
WaitTimeSeconds: 1 | ||
}).promise(); | ||
const messages = receiveMessageResponse.Messages.map((message) => JSON.parse(message.Body)); | ||
|
||
t.is(messages.length, 2); | ||
|
||
const receivedPdrnames = messages.map((message) => message.payload.pdr.name); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: (Here and elsewhere in this pr)
https://github.com/airbnb/javascript#arrows--one-arg-parens But this is not consistent across the repo, so I don't feel strongly about leaving as is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Airbnb's guide says to omit the braces, but our project style guide overrides that: https://github.com/cumulus-nasa/cumulus/blob/master/.eslintrc.json#L95 |
||
event.input.pdrs.map((pdr) => pdr.name).forEach((pdrName) => | ||
t.true(receivedPdrnames.includes(pdrName))); | ||
|
||
// Figure out what messages we should have received for each PDR | ||
const expectedMessages = {}; | ||
event.input.pdrs.forEach((pdr) => { | ||
expectedMessages[pdr.name] = { | ||
cumulus_meta: { | ||
state_machine: t.context.stateMachineArn | ||
}, | ||
meta: { | ||
collection: { name: 'collection-name' }, | ||
provider: { name: 'provider-name' } | ||
}, | ||
payload: { | ||
pdr: { | ||
name: pdr.name, | ||
path: pdr.path | ||
} | ||
} | ||
}; | ||
}); | ||
|
||
// Make sure we did receive those messages | ||
messages.forEach((message) => { | ||
const pdrName = message.payload.pdr.name; | ||
t.deepEqual(message, expectedMessages[pdrName]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tests are 💯 |
||
}); | ||
}); | ||
|
||
test.todo('An appropriate error is thrown if the message template could not be fetched'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a ticket for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is now - https://bugs.earthdata.nasa.gov/browse/CUMULUS-368 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function needs a jsdoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.