Skip to content

Commit

Permalink
CUMULUS-359 kinesis consumer users sf-scheduler (#262)
Browse files Browse the repository at this point in the history
**Summary:** Kinesis consumer users sf-scheduler

Closes #262 

Addresses [CUMULUS-359](https://bugs.earthdata.nasa.gov/browse/CUMULUS-359)

## Detailed Changes

* kinesis consumer now calls sf-scheduler function schedule
* updated tests
* kinesis consumer lambda now requires env variables for the collections and providers table used in sf-scheduler#schedule
* (unrelated) fixes eslint errors in `packages/api/lambdas`

## Test Plan
- [x] Unit tests
- [x] Adhoc testing

Reviewers: @laurenfrederick
  • Loading branch information
abarciauskas-bgse committed Mar 19, 2018
1 parent ace231f commit 566a429
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 153 deletions.
2 changes: 1 addition & 1 deletion .eslint-ratchet-high-water-mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1910
1874
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,25 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed
- Update vulnerable npm packages [CUMULUS-425]
- `@cumulus/api`: `kinesis-consumer.js` uses `sf-scheduler.js#schedule` instead of placing a message directly on the `startSF` SQS queue. This is a fix for [CUMULUS-359](https://bugs.earthdata.nasa.gov/browse/CUMULUS-359) because `sf-scheduler.js#schedule` looks up the provider and collection data in DynamoDB and adds it to the `meta` object of the enqueued message payload.
- `@cumulus/api`: `kinesis-consumer.js` catches and logs errors instead of doing an error callback. Before this change, `kinesis-consumer` was failing to process new records when an existing record caused an error because it would call back with an error and stop processing additional records. It keeps trying to process the record causing the error because it's "position" in the stream is unchanged. Catching and logging the errors is part 1 of the fix. Proposed part 2 is to enqueue the error and the message on a "dead-letter" queue so it can be processed later ([CUMULUS-413](https://bugs.earthdata.nasa.gov/browse/CUMULUS-413)).

### Removed

- `@cumulus/ingest/aws`: Remove queueWorkflowMessage which is no longer being used by `@cumulus/api`'s `kinesis-consumer.js`.

## [v1.1.4] - 2018-03-15
### Added
- added flag `useList` to parse-pdr [CUMULUS-404]

### Fixed

- Pass encrypted password to the ApiGranule Lambda function [CUMULUS-424]


## [v1.1.3] - 2018-03-14
### Fixed
- Changed @cumulus/deployment package install behavior. The build process will happen after installation
Expand Down
7 changes: 5 additions & 2 deletions packages/api/config/lambdas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ kinesisConsumer:
RulesTable:
function: Ref
value: RulesTableDynamoDB
invoke:
CollectionsTable:
function: Ref
value: ScheduleSFLambdaFunction
value: CollectionsTableDynamoDB
ProvidersTable:
function: Ref
value: ProvidersTableDynamoDB
bucket: '{{buckets.internal}}'

ScheduleSF:
Expand Down
17 changes: 8 additions & 9 deletions packages/api/lambdas/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* - creating API users
* - encrypting CMR user/pass and adding it to configuration files
*/

'use strict';

const https = require('https');
Expand Down Expand Up @@ -44,19 +45,17 @@ async function bootstrapElasticSearch(host, index = 'cumulus') {
else {
log.info(`index ${index} already exists`);
}

return;
}

async function bootstrapUsers(table, records) {
if (!table) {
return new Promise(resolve => resolve());
return new Promise((resolve) => resolve());
}
const user = new Manager(table);

// delete all user records
const existingUsers = await user.scan();
await Promise.all(existingUsers.Items.map(u => user.delete({ userName: u.userName })));
await Promise.all(existingUsers.Items.map((u) => user.delete({ userName: u.userName })));
// add new ones
const additions = records.map((record) => user.create({
userName: record.username,
Expand All @@ -69,7 +68,7 @@ async function bootstrapUsers(table, records) {

async function bootstrapCmrProvider(password) {
if (!password) {
return new Promise(resolve => resolve('nopassword'));
return new Promise((resolve) => resolve('nopassword'));
}
return DefaultProvider.encrypt(password);
}
Expand Down Expand Up @@ -140,7 +139,7 @@ function handler(event, context, cb) {
};

return sendResponse(event, 'SUCCESS', data, cb);
}).catch(e => {
}).catch((e) => {
log.error(e);
return sendResponse(event, 'FAILED', null, cb);
});
Expand All @@ -155,8 +154,8 @@ justLocalRun(() => {
//const a = {};
//handler(a, {}, (e, r) => console.log(e, r));
//bootstrapCmrProvider('testing').then(r => {
//console.log(r)
//return DefaultProvider.decrypt(r)
//console.log(r)
//return DefaultProvider.decrypt(r)
//}).then(r => console.log(r))
//.catch(e => console.log(e));
//.catch(e => console.log(e));
});
13 changes: 6 additions & 7 deletions packages/api/lambdas/db-indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function indexRecord(esClient, record) {

//determine whether the record should be indexed
const acceptedTables = ['Collection', 'Provider', 'Rule'];
const tableConfig = {}
const tableConfig = {};
acceptedTables.forEach((a) => {
tableConfig[`${stack}-${a}sTable`] = indexer[`index${a}`];
});
Expand Down Expand Up @@ -54,21 +54,20 @@ function indexRecord(esClient, record) {
}

async function indexRecords(records) {
const concurrencyLimit = process.env.CONCURRENCY || 3
const concurrencyLimit = process.env.CONCURRENCY || 3;
const limit = pLimit(concurrencyLimit);
const esClient = await Search.es();

const promises = records.map((record) => limit(
() => indexRecord(esClient, record)
));
const promises = records.map((record) => limit(() => indexRecord(esClient, record)));
return Promise.all(promises);
}

/**
* Sync changes to dynamodb to an elasticsearch instance.
* Sending updates to this lambda is handled by automatically AWS.
* @param {array} Records list of records with an eventName property signifying REMOVE or INSERT.
* @return {string} response text indicating the number of records altered in elasticsearch.
*
* @param {Array} Records - list of records with an eventName property signifying REMOVE or INSERT.
* @returns {string} response text indicating the number of records altered in elasticsearch.
*/
function handler(event, context, cb) {
const records = event.Records;
Expand Down
23 changes: 7 additions & 16 deletions packages/api/lambdas/jobs.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* runs a bunch of periodic jobs to keep the database updateToDate */

'use strict';

const get = require('lodash.get');
Expand All @@ -19,8 +20,8 @@ async function findStaleRecords(type, q, limit = 100, page = 1) {
const response = await search.query();

//if (response.results.length >= limit) {
//const more = await findStaleRecords(type, q, limit, page + 1);
//return response.results.concat(more);
//const more = await findStaleRecords(type, q, limit, page + 1);
//return response.results.concat(more);
//}
return response.results;
}
Expand All @@ -29,16 +30,12 @@ async function updateGranulesAndPdrs(esClient, url, error) {
// find related granule and update their status
let searchTerm = `execution:"${url}"`;
const granules = await findStaleRecords('granule', searchTerm, 100);
await Promise.all(granules.map(g => partialRecordUpdate(
esClient, g.granuleId, 'granule', { status: 'failed', error }, g.collectionId
)));
await Promise.all(granules.map((g) => partialRecordUpdate(esClient, g.granuleId, 'granule', { status: 'failed', error }, g.collectionId)));

// find related pdrs and update their status
searchTerm = `execution:"${url}"`;
const pdrs = await findStaleRecords('pdr', searchTerm, 100);
await Promise.all(pdrs.map(p => partialRecordUpdate(
esClient, p.pdrName, 'pdr', { status: 'failed', error }
)));
await Promise.all(pdrs.map((p) => partialRecordUpdate(esClient, p.pdrName, 'pdr', { status: 'failed', error })));
}

async function checkExecution(arn, url, timestamp, esClient) {
Expand Down Expand Up @@ -133,17 +130,11 @@ async function cleanup() {

const limit = pLimit(2);

await Promise.all(
executions.slice(0, 400).map(
ex => limit(
() => checkExecution(ex.arn, ex.execution, ex.timestamp, esClient)
)
)
);
await Promise.all(executions.slice(0, 400).map((ex) => limit(() => checkExecution(ex.arn, ex.execution, ex.timestamp, esClient))));
}

function handler(event, context, cb) {
cleanup().then(() => cb()).catch(e => {
cleanup().then(() => cb()).catch((e) => {
log.error(e);
cb(e);
});
Expand Down
25 changes: 18 additions & 7 deletions packages/api/lambdas/kinesis-consumer.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
/* eslint-disable require-yield */

'use strict';

const Ajv = require('ajv');

const log = require('@cumulus/common/log');
const Rule = require('../models/rules');
const messageSchema = require('./kinesis-consumer-event-schema.json');
const { queueWorkflowMessage } = require('@cumulus/ingest/queue');
const sfSchedule = require('./sf-scheduler');

/**
* `getKinesisRules` scans and returns DynamoDB rules table for enabled,
Expand All @@ -14,7 +17,7 @@ const { queueWorkflowMessage } = require('@cumulus/ingest/queue');
* @returns {Array} List of zero or more rules found from table scan
*/
async function getKinesisRules(event) {
const collection = event.collection;
const { collection } = event;
const model = new Rule();
const kinesisRules = await model.scan({
names: {
Expand Down Expand Up @@ -51,8 +54,11 @@ async function queueMessageForRule(kinesisRule, eventObject) {
payload: eventObject
};

return Rule.buildPayload(item)
.then(queueWorkflowMessage);
const payload = await Rule.buildPayload(item);
return new Promise((resolve, reject) => sfSchedule(payload, {}, (err, result) => {
if (err) reject(err);
resolve(result);
}));
}

/**
Expand All @@ -63,10 +69,10 @@ async function queueMessageForRule(kinesisRule, eventObject) {
* @returns {(error|Object)} Throws an Ajv.ValidationError if event object is invalid.
* Returns the event object if event is valid.
*/
async function validateMessage(event) {
function validateMessage(event) {
const ajv = new Ajv({ allErrors: true });
const validate = ajv.compile(messageSchema);
return await validate(event);
return validate(event);
}

/**
Expand All @@ -86,7 +92,12 @@ function processRecord(record) {
.then(getKinesisRules)
.then((kinesisRules) => (
Promise.all(kinesisRules.map((kinesisRule) => queueMessageForRule(kinesisRule, eventObject)))
));
))
.catch((err) => {
log.error('Caught error in process record:');
log.error(err);
return err;
});
}

/**
Expand Down
13 changes: 8 additions & 5 deletions packages/api/lambdas/sf-scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ const { Provider, Collection } = require('../models');
* Builds a cumulus-compatible message and adds it to the startSF queue
* startSF queue will then start a stepfunction for the given message
*
* @param {object} event lambda input message
* @param {object} context lambda context
* @param {function} cb lambda callback
* @param {Object} event - lambda input message
* @param {Object} context - lambda context
* @param {function} cb - lambda callback
* @returns {function} Calls callback with result of SQS.sendMessage or error
*/
function schedule(event, context, cb) {
const template = get(event, 'template');
Expand Down Expand Up @@ -53,9 +54,11 @@ function schedule(event, context, cb) {
.then((c) => {
if (c) message.meta.collection = c;
})
.then(() => SQS.sendMessage(message.meta.queues.startSF, message))
.then(() => {
SQS.sendMessage(message.meta.queues.startSF, message);
})
.then((r) => cb(null, r))
.catch(e => cb(e));
.catch((e) => cb(e));
}

module.exports = schedule;
Loading

0 comments on commit 566a429

Please sign in to comment.