Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update queue-pdrs task to use the message adapter #228

Merged
merged 8 commits into from
Mar 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
- Integration test package with command line tool [CUMULUS-200] by @laurenfrederick
- Added a `jlog` function to `common/test-utils` to aid in test debugging
- Integration test package with command line tool [CUMULUS-200] by @laurenfrederick

### Updated
- The `queue-pdrs` task now uses the [cumulus-message-adapter-js](https://github.com/cumulus-nasa/cumulus-message-adapter-js)
library
- Updated the `queue-pdrs` JSON schemas
- The test-utils schema validation functions now throw an error if validation
fails
- The `queue-granules` task now uses the [cumulus-message-adapter-js](https://github.com/cumulus-nasa/cumulus-message-adapter-js)
library
- Updated the `queue-granules` JSON schemas

### Removed
- Removed the `getSfnExecutionByName` function from `common/aws`
- Removed the `getGranuleStatus` function from `common/aws`

## [v1.0.1] - 2018-02-27

Expand Down
65 changes: 32 additions & 33 deletions cumulus/tasks/queue-pdrs/index.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,41 @@
'use strict';

import { queuePdr } from '@cumulus/ingest/queue';
const log = require('@cumulus/common/log');
const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js');
const { enqueueParsePdrMessage } = require('@cumulus/ingest/queue');

/**
* Callback function provided by aws lambda. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-handler.html#nodejs-prog-model-handler-callback
* @callback lambdaCallback
* @param {object} error
* @param {object} output - output object matching schemas/output.json
* @param {integer} output.pdrs_queued
*/

/**
* For each PDR, generate a new SF messages send to the step function queue to be executed
* @param {object} event lambda event object
* @param {object} event.input
* @param {array} event.input.pdrs
* @param {object} context Lambda context object. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html
* @param {lambdaCallback} callback callback function
* @return {undefined}
* See schemas/input.json and schemas/config.json for detailed event description
*
* @param {Object} event - Lambda event object
* @returns {Promise} - see schemas/output.json for detailed output schema
* that is passed to the next task in the workflow
**/
function handler(event, context, cb) {
async function queuePdrs(event) {
const pdrs = event.input.pdrs || [];
const config = event.config;
const queuedPdrs = pdrs.map((pdr) => queuePdr(
config.queueUrl,
config.templateUri,
config.provider,
config.collection,
pdr
));

return Promise.all(queuedPdrs).then(() => {
cb(null, { pdrs_queued: queuedPdrs.length });
}).catch(e => {
log.error(e);
return cb(e);
});
await Promise.all(
pdrs.map((pdr) => enqueueParsePdrMessage(
pdr,
event.config.queueUrl,
event.config.parsePdrMessageTemplateUri,
event.config.provider,
event.config.collection
))
);

return { pdrs_queued: pdrs.length };
}
exports.queuePdrs = queuePdrs;

module.exports.handler = handler;
/**
* Lambda handler
*
* @param {Object} event - a Cumulus Message
* @param {Object} context - an AWS Lambda context
* @param {Function} callback - an AWS Lambda handler
* @returns {undefined} - does not return a value
*/
function handler(event, context, callback) {
cumulusMessageAdapter.runCumulusTask(queuePdrs, event, context, callback);
}
exports.handler = handler;
1 change: 1 addition & 0 deletions cumulus/tasks/queue-pdrs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"license": "Apache-2.0",
"dependencies": {
"@cumulus/common": "^1.0.1",
"@cumulus/cumulus-message-adapter-js": "0.0.1-beta.3",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have this on 1.0.0 now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"@cumulus/ingest": "^1.0.1",
"babel-core": "^6.25.0",
"babel-loader": "^6.2.4",
Expand Down
18 changes: 18 additions & 0 deletions cumulus/tasks/queue-pdrs/schemas/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"title": "QueuePdrsConfig",
"description": "Describes the config used by the queue-pdrs task",
"type": "object",
"required": [
"collection",
"provider",
"queueUrl",
"parsePdrMessageTemplateUri"
],
"additionalProperties": false,
"properties": {
"collection": { "type": "object" },
"provider": { "type": "object" },
"queueUrl": { "type": "string" },
"parsePdrMessageTemplateUri": { "type": "string" }
}
}
41 changes: 0 additions & 41 deletions cumulus/tasks/queue-pdrs/schemas/config.json.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"title": "QueuePdrsOutput",
"description": "Describes the output produced by the queue-pdrs task",
"type": "object",
"required": [ "pdrs_queued" ],
"additionalProperties": false,
"properties": {
"pdrs_queued": {
"type": "integer"
Expand Down
5 changes: 5 additions & 0 deletions cumulus/tasks/queue-pdrs/tests/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rules": {
"no-param-reassign": "off"
}
}
185 changes: 151 additions & 34 deletions cumulus/tasks/queue-pdrs/tests/index.js
Original file line number Diff line number Diff line change
@@ -1,53 +1,170 @@
/* eslint-disable no-param-reassign */
'use strict';

const test = require('ava');
const MockAWS = require('@mapbox/mock-aws-sdk-js');

const { s3, sqs, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws');
const { createQueue, randomString } = require('@cumulus/common/test-utils');
const {
createQueue,
randomString,
validateConfig,
validateInput,
validateOutput
} = require('@cumulus/common/test-utils');

const { handler } = require('../index');
const inputJSON = require('./fixtures/input.json');
const workflowTemplate = require('./fixtures/workflow-template.json');

const aws = require('@cumulus/common/aws');
const { queuePdrs } = require('../index');

test.beforeEach(async (t) => {
t.context.queueUrl = await createQueue();
t.context.templateBucket = randomString();
await s3().createBucket({ Bucket: t.context.templateBucket }).promise();

t.context.stateMachineArn = randomString();

t.context.messageTemplate = {
cumulus_meta: {
state_machine: t.context.stateMachineArn
},
meta: {}
};
const messageTemplateKey = `${randomString()}/template.json`;
await s3().putObject({
Bucket: t.context.templateBucket,
Key: messageTemplateKey,
Body: JSON.stringify(t.context.messageTemplate)
}).promise();

t.context.event = {
config: {
collection: { name: 'collection-name' },
provider: { name: 'provider-name' },
queueUrl: await createQueue(),
parsePdrMessageTemplateUri: `s3://${t.context.templateBucket}/${messageTemplateKey}`
},
input: {
pdrs: []
}
};
});

test.afterEach(async (t) => {
await Promise.all([
recursivelyDeleteS3Bucket(t.context.templateBucket),
sqs().deleteQueue({ QueueUrl: t.context.event.config.queueUrl }).promise()
]);
});

test('The correct output is returned when PDRs are queued', async (t) => {
const event = t.context.event;
event.input.pdrs = [
{ name: randomString(), path: randomString() },
{ name: randomString(), path: randomString() }
];

await validateConfig(t, event.config);
await validateInput(t, event.input);

const output = await queuePdrs(event);

await validateOutput(t, output);
t.deepEqual(output, { pdrs_queued: 2 });
});

test('The correct output is returned when no PDRs are queued', async (t) => {
const event = t.context.event;
event.input.pdrs = [];

await validateConfig(t, event.config);
await validateInput(t, event.input);

t.context.bucket = randomString();
return s3().createBucket({ Bucket: t.context.bucket }).promise();
const output = await queuePdrs(event);

await validateOutput(t, output);
t.deepEqual(output, { pdrs_queued: 0 });
});

test.afterEach.always((t) =>
Promise.all([
recursivelyDeleteS3Bucket(t.context.bucket),
sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise()
]));
test('PDRs are added to the queue', async (t) => {
const event = t.context.event;
event.input.pdrs = [
{ name: randomString(), path: randomString() },
{ name: randomString(), path: randomString() }
];

await validateConfig(t, event.config);
await validateInput(t, event.input);

test('queue pdrs', async (t) => {
const Bucket = t.context.bucket;
const ParsePdrTemplate = `s3://${Bucket}/dev/workflows/ParsePdr.json`;
const output = await queuePdrs(event);

await aws.s3().putObject({
Bucket,
Key: 'dev/workflows/ParsePdr.json',
Body: JSON.stringify(workflowTemplate)
await validateOutput(t, output);

// Get messages from the queue
const receiveMessageResponse = await sqs().receiveMessage({
QueueUrl: t.context.event.config.queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1
}).promise();
const messages = receiveMessageResponse.Messages;

MockAWS.stub('StepFunctions', 'describeExecution').returns({
promise: () => Promise.resolve({})
});
t.is(messages.length, 2);
});

test('The correct message is enqueued', async (t) => {
const event = t.context.event;
event.input.pdrs = [
{
name: randomString(),
path: randomString()
},
{
name: randomString(),
path: randomString()
},
];

await validateConfig(t, event.config);
await validateInput(t, event.input);

const output = await queuePdrs(event);

const input = Object.assign({}, inputJSON);
input.config.templateUri = ParsePdrTemplate;
input.config.queueUrl = t.context.queueUrl;
await validateOutput(t, output);

return handler(input, {}, (e, output) => {
t.ifError(e);
t.is(typeof output, 'object');
t.is(output.pdrs_queued, 2);
MockAWS.StepFunctions.restore();
// Get messages from the queue
const receiveMessageResponse = await sqs().receiveMessage({
QueueUrl: t.context.event.config.queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1
}).promise();
const messages = receiveMessageResponse.Messages.map((message) => JSON.parse(message.Body));

t.is(messages.length, 2);

const receivedPdrnames = messages.map((message) => message.payload.pdr.name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: (Here and elsewhere in this pr)

If your function takes a single argument and doesn’t use braces, omit the parentheses

https://github.com/airbnb/javascript#arrows--one-arg-parens

But this is not consistent across the repo, so I don't feel strongly about leaving as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Airbnb's guide says to omit the braces, but our project style guide overrides that:

https://github.com/cumulus-nasa/cumulus/blob/master/.eslintrc.json#L95

event.input.pdrs.map((pdr) => pdr.name).forEach((pdrName) =>
t.true(receivedPdrnames.includes(pdrName)));

// Figure out what messages we should have received for each PDR
const expectedMessages = {};
event.input.pdrs.forEach((pdr) => {
expectedMessages[pdr.name] = {
cumulus_meta: {
state_machine: t.context.stateMachineArn
},
meta: {
collection: { name: 'collection-name' },
provider: { name: 'provider-name' }
},
payload: {
pdr: {
name: pdr.name,
path: pdr.path
}
}
};
});

// Make sure we did receive those messages
messages.forEach((message) => {
const pdrName = message.payload.pdr.name;
t.deepEqual(message, expectedMessages[pdrName]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are 💯

});
});

test.todo('An appropriate error is thrown if the message template could not be fetched');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a ticket for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 changes: 2 additions & 1 deletion packages/common/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ async function validateJSON(t, schemaFilename, data) {
const message = `${schemaName} validation failed: ${ajv.errorsText()}`;
console.log(message);
console.log(JSON.stringify(data, null, 2));
return t.fail(message);
t.fail(message);
throw new Error(message);
}
return valid;
}
Expand Down
Loading