Skip to content

Commit

Permalink
Merge pull request #228 from cumulus-nasa/UpdateQueuePdrsTask
Browse files Browse the repository at this point in the history
Update queue-pdrs task to use the message adapter
  • Loading branch information
Marc committed Mar 5, 2018
2 parents 60256eb + 8f1780c commit 44aa570
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 126 deletions.
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
- Integration test package with command line tool [CUMULUS-200] by @laurenfrederick
- Added a `jlog` function to `common/test-utils` to aid in test debugging
- Integration test package with command line tool [CUMULUS-200] by @laurenfrederick

### Updated
- The `queue-pdrs` task now uses the [cumulus-message-adapter-js](https://github.com/cumulus-nasa/cumulus-message-adapter-js)
library
- Updated the `queue-pdrs` JSON schemas
- The test-utils schema validation functions now throw an error if validation
fails
- The `queue-granules` task now uses the [cumulus-message-adapter-js](https://github.com/cumulus-nasa/cumulus-message-adapter-js)
library
- Updated the `queue-granules` JSON schemas

### Removed
- Removed the `getSfnExecutionByName` function from `common/aws`
- Removed the `getGranuleStatus` function from `common/aws`

## [v1.0.1] - 2018-02-27

Expand Down
65 changes: 32 additions & 33 deletions cumulus/tasks/queue-pdrs/index.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,41 @@
'use strict';

import { queuePdr } from '@cumulus/ingest/queue';
const log = require('@cumulus/common/log');
const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js');
const { enqueueParsePdrMessage } = require('@cumulus/ingest/queue');

/**
* Callback function provided by aws lambda. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-handler.html#nodejs-prog-model-handler-callback
* @callback lambdaCallback
* @param {object} error
* @param {object} output - output object matching schemas/output.json
* @param {integer} output.pdrs_queued
*/

/**
* For each PDR, generate a new SF messages send to the step function queue to be executed
* @param {object} event lambda event object
* @param {object} event.input
* @param {array} event.input.pdrs
* @param {object} context Lambda context object. See https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html
* @param {lambdaCallback} callback callback function
* @return {undefined}
* See schemas/input.json and schemas/config.json for detailed event description
*
* @param {Object} event - Lambda event object
* @returns {Promise} - see schemas/output.json for detailed output schema
* that is passed to the next task in the workflow
**/
function handler(event, context, cb) {
async function queuePdrs(event) {
const pdrs = event.input.pdrs || [];
const config = event.config;
const queuedPdrs = pdrs.map((pdr) => queuePdr(
config.queueUrl,
config.templateUri,
config.provider,
config.collection,
pdr
));

return Promise.all(queuedPdrs).then(() => {
cb(null, { pdrs_queued: queuedPdrs.length });
}).catch(e => {
log.error(e);
return cb(e);
});
await Promise.all(
pdrs.map((pdr) => enqueueParsePdrMessage(
pdr,
event.config.queueUrl,
event.config.parsePdrMessageTemplateUri,
event.config.provider,
event.config.collection
))
);

return { pdrs_queued: pdrs.length };
}
exports.queuePdrs = queuePdrs;

module.exports.handler = handler;
/**
* Lambda handler
*
* @param {Object} event - a Cumulus Message
* @param {Object} context - an AWS Lambda context
* @param {Function} callback - an AWS Lambda handler
* @returns {undefined} - does not return a value
*/
function handler(event, context, callback) {
cumulusMessageAdapter.runCumulusTask(queuePdrs, event, context, callback);
}
exports.handler = handler;
1 change: 1 addition & 0 deletions cumulus/tasks/queue-pdrs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"license": "Apache-2.0",
"dependencies": {
"@cumulus/common": "^1.0.1",
"@cumulus/cumulus-message-adapter-js": "0.0.1-beta.3",
"@cumulus/ingest": "^1.0.1",
"babel-core": "^6.25.0",
"babel-loader": "^6.2.4",
Expand Down
18 changes: 18 additions & 0 deletions cumulus/tasks/queue-pdrs/schemas/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"title": "QueuePdrsConfig",
"description": "Describes the config used by the queue-pdrs task",
"type": "object",
"required": [
"collection",
"provider",
"queueUrl",
"parsePdrMessageTemplateUri"
],
"additionalProperties": false,
"properties": {
"collection": { "type": "object" },
"provider": { "type": "object" },
"queueUrl": { "type": "string" },
"parsePdrMessageTemplateUri": { "type": "string" }
}
}
41 changes: 0 additions & 41 deletions cumulus/tasks/queue-pdrs/schemas/config.json.txt

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"title": "QueuePdrsOutput",
"description": "Describes the output produced by the queue-pdrs task",
"type": "object",
"required": [ "pdrs_queued" ],
"additionalProperties": false,
"properties": {
"pdrs_queued": {
"type": "integer"
Expand Down
5 changes: 5 additions & 0 deletions cumulus/tasks/queue-pdrs/tests/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rules": {
"no-param-reassign": "off"
}
}
185 changes: 151 additions & 34 deletions cumulus/tasks/queue-pdrs/tests/index.js
Original file line number Diff line number Diff line change
@@ -1,53 +1,170 @@
/* eslint-disable no-param-reassign */
'use strict';

const test = require('ava');
const MockAWS = require('@mapbox/mock-aws-sdk-js');

const { s3, sqs, recursivelyDeleteS3Bucket } = require('@cumulus/common/aws');
const { createQueue, randomString } = require('@cumulus/common/test-utils');
const {
createQueue,
randomString,
validateConfig,
validateInput,
validateOutput
} = require('@cumulus/common/test-utils');

const { handler } = require('../index');
const inputJSON = require('./fixtures/input.json');
const workflowTemplate = require('./fixtures/workflow-template.json');

const aws = require('@cumulus/common/aws');
const { queuePdrs } = require('../index');

test.beforeEach(async (t) => {
t.context.queueUrl = await createQueue();
t.context.templateBucket = randomString();
await s3().createBucket({ Bucket: t.context.templateBucket }).promise();

t.context.stateMachineArn = randomString();

t.context.messageTemplate = {
cumulus_meta: {
state_machine: t.context.stateMachineArn
},
meta: {}
};
const messageTemplateKey = `${randomString()}/template.json`;
await s3().putObject({
Bucket: t.context.templateBucket,
Key: messageTemplateKey,
Body: JSON.stringify(t.context.messageTemplate)
}).promise();

t.context.event = {
config: {
collection: { name: 'collection-name' },
provider: { name: 'provider-name' },
queueUrl: await createQueue(),
parsePdrMessageTemplateUri: `s3://${t.context.templateBucket}/${messageTemplateKey}`
},
input: {
pdrs: []
}
};
});

test.afterEach(async (t) => {
await Promise.all([
recursivelyDeleteS3Bucket(t.context.templateBucket),
sqs().deleteQueue({ QueueUrl: t.context.event.config.queueUrl }).promise()
]);
});

test('The correct output is returned when PDRs are queued', async (t) => {
const event = t.context.event;
event.input.pdrs = [
{ name: randomString(), path: randomString() },
{ name: randomString(), path: randomString() }
];

await validateConfig(t, event.config);
await validateInput(t, event.input);

const output = await queuePdrs(event);

await validateOutput(t, output);
t.deepEqual(output, { pdrs_queued: 2 });
});

test('The correct output is returned when no PDRs are queued', async (t) => {
const event = t.context.event;
event.input.pdrs = [];

await validateConfig(t, event.config);
await validateInput(t, event.input);

t.context.bucket = randomString();
return s3().createBucket({ Bucket: t.context.bucket }).promise();
const output = await queuePdrs(event);

await validateOutput(t, output);
t.deepEqual(output, { pdrs_queued: 0 });
});

test.afterEach.always((t) =>
Promise.all([
recursivelyDeleteS3Bucket(t.context.bucket),
sqs().deleteQueue({ QueueUrl: t.context.queueUrl }).promise()
]));
test('PDRs are added to the queue', async (t) => {
const event = t.context.event;
event.input.pdrs = [
{ name: randomString(), path: randomString() },
{ name: randomString(), path: randomString() }
];

await validateConfig(t, event.config);
await validateInput(t, event.input);

test('queue pdrs', async (t) => {
const Bucket = t.context.bucket;
const ParsePdrTemplate = `s3://${Bucket}/dev/workflows/ParsePdr.json`;
const output = await queuePdrs(event);

await aws.s3().putObject({
Bucket,
Key: 'dev/workflows/ParsePdr.json',
Body: JSON.stringify(workflowTemplate)
await validateOutput(t, output);

// Get messages from the queue
const receiveMessageResponse = await sqs().receiveMessage({
QueueUrl: t.context.event.config.queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1
}).promise();
const messages = receiveMessageResponse.Messages;

MockAWS.stub('StepFunctions', 'describeExecution').returns({
promise: () => Promise.resolve({})
});
t.is(messages.length, 2);
});

test('The correct message is enqueued', async (t) => {
const event = t.context.event;
event.input.pdrs = [
{
name: randomString(),
path: randomString()
},
{
name: randomString(),
path: randomString()
},
];

await validateConfig(t, event.config);
await validateInput(t, event.input);

const output = await queuePdrs(event);

const input = Object.assign({}, inputJSON);
input.config.templateUri = ParsePdrTemplate;
input.config.queueUrl = t.context.queueUrl;
await validateOutput(t, output);

return handler(input, {}, (e, output) => {
t.ifError(e);
t.is(typeof output, 'object');
t.is(output.pdrs_queued, 2);
MockAWS.StepFunctions.restore();
// Get messages from the queue
const receiveMessageResponse = await sqs().receiveMessage({
QueueUrl: t.context.event.config.queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1
}).promise();
const messages = receiveMessageResponse.Messages.map((message) => JSON.parse(message.Body));

t.is(messages.length, 2);

const receivedPdrnames = messages.map((message) => message.payload.pdr.name);
event.input.pdrs.map((pdr) => pdr.name).forEach((pdrName) =>
t.true(receivedPdrnames.includes(pdrName)));

// Figure out what messages we should have received for each PDR
const expectedMessages = {};
event.input.pdrs.forEach((pdr) => {
expectedMessages[pdr.name] = {
cumulus_meta: {
state_machine: t.context.stateMachineArn
},
meta: {
collection: { name: 'collection-name' },
provider: { name: 'provider-name' }
},
payload: {
pdr: {
name: pdr.name,
path: pdr.path
}
}
};
});

// Make sure we did receive those messages
messages.forEach((message) => {
const pdrName = message.payload.pdr.name;
t.deepEqual(message, expectedMessages[pdrName]);
});
});

test.todo('An appropriate error is thrown if the message template could not be fetched');
3 changes: 2 additions & 1 deletion packages/common/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ async function validateJSON(t, schemaFilename, data) {
const message = `${schemaName} validation failed: ${ajv.errorsText()}`;
console.log(message);
console.log(JSON.stringify(data, null, 2));
return t.fail(message);
t.fail(message);
throw new Error(message);
}
return valid;
}
Expand Down
Loading

0 comments on commit 44aa570

Please sign in to comment.