Skip to content

Commit

Permalink
CUMULUS-175 : Dashboard providers not in sync with AWS providers (#234)
Browse files Browse the repository at this point in the history
**Summary:** Hooks up db-indexer lambda function to dynamoDB streams which replicates updates to elasticsearch. Update API endpoints to just use elastic search.

Closes #234 

Addresses [CUMULUS-175: Dashboard providers not in sync with AWS providers](https://bugs.earthdata.nasa.gov/browse/CUMULUS-175)

## Changes

* Update API endpoints for collections, providers and rules.
* Add happy path tests for updated endpoints
* Adds db-indexer lambda function and cloudformation configuration of event source of dynamo db streams
* Adds test for db-indexer lambda function, currently skipped because it only works locally

## Test Plan
- [x] Unit tests
- [x] Adhoc testing of dashboard: list, get, create, update, delete of collections, providers and rules
- [x] Testing of elasticsearch - checked instances exist in `<ELASTICSEARCH_DOMAIN>/_all/<collection|rule|provider>/_search?pretty=true`

Reviewers: @scisco
  • Loading branch information
abarciauskas-bgse committed Mar 8, 2018
1 parent b96dd06 commit da2df8a
Show file tree
Hide file tree
Showing 21 changed files with 704 additions and 87 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Fixed

- **CUMULUS-175: "Dashboard providers not in sync with AWS providers."** The root cause of this bug - DynamoDB operations not showing up in Elasticsearch - was shared by collections and rules. The fix was to update providers', collections' and rules; POST, PUT and DELETE endpoints to operate on DynamoDB and using DynamoDB streams to update Elasticsearch. The following packages were made:
- `@cumulus/deployment` deploys DynamoDB streams for the Collections, Providers and Rules tables as well as a new lambda function called `dbIndexer`. The `dbIndexer` lambda has an event source mapping which listens to each of the DynamoDB streams. The dbIndexer lambda receives events referencing operations on the DynamoDB table and updates the elasticsearch cluster accordingly.
- The `@cumulus/api` endpoints for collections, providers and rules _only_ query DynamoDB, with the exception of LIST endpoints and the collections' GET endpoint.

### Updated
- Broke up `kes.override.js` of @cumulus/deployment to multiple modules and moved to a new location
- Expanded @cumulus/deployment test coverage
Expand Down
5 changes: 3 additions & 2 deletions packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ See [Cumulus README](https://github.com/cumulus-nasa/cumulus/blob/master/README.
Running tests for kinesis-consumer depends on localstack. Once you have installed localstack, you can start it for dynamoDB only:

```
SERVICES=dynamodb localstack start
LAMBDA_EXECUTOR=docker localstack start
```

Then you can run tests locally via:

```bash
LOCALSTACK_HOST=localhost npm run test
export DOCKERHOST=$(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{ print $2 }' | cut -f2 -d: | head -n1)
LOCALSTACK_HOST=localhost DOCKERHOST=${DOCKERHOST} IS_LOCAL=true npm run test
```
12 changes: 12 additions & 0 deletions packages/api/config/lambdas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ sf2snsEnd:
memory: 256
source: 'node_modules/@cumulus/api/dist/'

dbIndexer:
handler: index.dbIndexer
timeout: 300
memory: 512
source: 'node_modules/@cumulus/api/dist/'
envs:
ES_HOST:
function: "Fn::GetAtt"
array:
- '{{es.name}}Domain'
- DomainEndpoint

kinesisConsumer:
handler: index.kinesisConsumer
timeout: 100
Expand Down
32 changes: 13 additions & 19 deletions packages/api/endpoints/collections.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const models = require('../models');
const Collection = require('../es/collections');
const RecordDoesNotExist = require('../lib/errors').RecordDoesNotExist;
const examplePayload = require('../tests/data/collections_post.json');
const { indexCollection, deleteRecord } = require('../es/indexer');

/**
* List all collections.
Expand All @@ -18,9 +17,7 @@ const { indexCollection, deleteRecord } = require('../es/indexer');
*/
function list(event, cb) {
const collection = new Collection(event);
collection.query().then(res => cb(null, res)).catch((e) => {
cb(e);
});
collection.query().then(res => cb(null, res)).catch(cb);
}

/**
Expand All @@ -37,8 +34,9 @@ function get(event, cb) {
.then((res) => {
const collection = new Collection(event);
return collection.getStats([res], [res.name]);
}).then(res => cb(null, res[0]))
.catch((e) => cb(e));
})
.then(res => cb(null, res[0]))
.catch(cb);
}

/**
Expand All @@ -63,10 +61,8 @@ function post(event, cb) {
.catch((e) => {
if (e instanceof RecordDoesNotExist) {
return c.create(data)
.then(() => Collection.es())
.then(esClient => indexCollection(esClient, data))
.then(() => cb(null, { message: 'Record saved', record: data }))
.catch(err => cb(err));
.catch(cb);
}
return cb(e);
});
Expand Down Expand Up @@ -94,11 +90,11 @@ function put(event, cb) {
const c = new models.Collection();

// get the record first
return c.get({ name, version }).then((originalData) => {
data = Object.assign({}, originalData, data);
return c.create(data);
}).then(() => Collection.es())
.then(esClient => indexCollection(esClient, data))
return c.get({ name, version })
.then((originalData) => {
data = Object.assign({}, originalData, data);
return c.create(data);
})
.then(() => cb(null, data))
.catch((err) => {
if (err instanceof RecordDoesNotExist) {
Expand All @@ -111,24 +107,22 @@ function put(event, cb) {
function del(event, cb) {
const name = _get(event.pathParameters, 'collectionName');
const version = _get(event.pathParameters, 'version');
const id = `${name}___${version}`;
const c = new models.Collection();

return c.get({ name, version })
.then(() => c.delete({ name, version }))
.then(() => Collection.es())
.then((esClient) => deleteRecord(esClient, id, 'collection'))
.then(() => cb(null, { message: 'Record deleted' }))
.catch(e => cb(e));
.catch(cb);
}

function handler(event, context) {
const httpMethod = _get(event, 'httpMethod');

if (!httpMethod) {
return context.fail('HttpMethod is missing');
}

return handle(event, context, true, (cb) => {
return handle(event, context, !process.env.TEST /* authCheck */, cb => {
if (event.httpMethod === 'GET' && event.pathParameters) {
get(event, cb);
}
Expand Down
43 changes: 15 additions & 28 deletions packages/api/endpoints/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
'use strict';

const _get = require('lodash.get');
const handle = require('../lib/response').handle;
const { handle } = require('../lib/response');
const models = require('../models');
const Search = require('../es/search').Search;
const { deleteRecord, indexProvider } = require('../es/indexer');
const RecordDoesNotExist = require('../lib/errors').RecordDoesNotExist;
const { Search } = require('../es/search');

/**
* List all providers.
Expand All @@ -16,9 +15,7 @@ const RecordDoesNotExist = require('../lib/errors').RecordDoesNotExist;
*/
function list(event, cb) {
const search = new Search(event, 'provider');
search.query().then(response => cb(null, response)).catch((e) => {
cb(e);
});
search.query().then(response => cb(null, response)).catch(cb);
}

/**
Expand All @@ -38,7 +35,8 @@ function get(event, cb) {
.then((res) => {
delete res.password;
cb(null, res);
}).catch((e) => cb(e));
})
.catch(cb);
}

/**
Expand All @@ -58,12 +56,8 @@ function post(event, cb) {
.catch((e) => {
if (e instanceof RecordDoesNotExist) {
return p.create(data)
.then((r) => {
data = r;
return Search.es();
}).then(esClient => indexProvider(esClient, data))
.then(() => cb(null, { message: 'Record saved', record: data }))
.catch(err => cb(err));
.then(data => cb(null, { message: 'Record saved', record: data }))
.catch(err => cb(err));
}
return cb(e);
});
Expand Down Expand Up @@ -91,17 +85,12 @@ function put(event, cb) {
return p.get({ id }).then((d) => {
originalData = d;
return p.update({ id }, data);
}).then(() => {
data = Object.assign({}, originalData, data);
return Search.es();
}).then(esClient => indexProvider(esClient, data))
.then(() => cb(null, data))
.catch((err) => {
if (err instanceof RecordDoesNotExist) {
return cb({ message: 'Record does not exist' });
}
return cb(err);
});
})
.then(data => cb(null, data))
.catch((err) => {
if (err instanceof RecordDoesNotExist) cb({ message: 'Record does not exist' });
return cb(err);
});
}

function del(event, cb) {
Expand All @@ -110,14 +99,12 @@ function del(event, cb) {

return p.get({ id })
.then(() => p.delete({ id }))
.then(() => Search.es())
.then((esClient) => deleteRecord(esClient, id, 'provider'))
.then(() => cb(null, { message: 'Record deleted' }))
.catch(e => cb(e));
.catch(cb);
}

function handler(event, context) {
handle(event, context, true, (cb) => {
handle(event, context, !process.env.TEST /* authCheck */, (cb) => {
if (event.httpMethod === 'GET' && event.pathParameters) {
get(event, cb);
}
Expand Down
38 changes: 9 additions & 29 deletions packages/api/endpoints/rules.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ const _get = require('lodash.get');
const { justLocalRun } = require('@cumulus/common/local-helpers');
const { handle } = require('../lib/response');
const models = require('../models');
const { Search } = require('../es/search');
const { deleteRecord, indexRule } = require('../es/indexer');
const { RecordDoesNotExist } = require('../lib/errors');
const { Search } = require('../es/search');

/**
* List all providers.
Expand All @@ -17,9 +16,7 @@ const { RecordDoesNotExist } = require('../lib/errors');
*/
function list(event, cb) {
const search = new Search(event, 'rule');
search.query().then(response => cb(null, response)).catch((e) => {
cb(e);
});
search.query().then(response => cb(null, response)).catch(cb);
}

/**
Expand All @@ -36,7 +33,8 @@ function get(event, cb) {
.then((res) => {
delete res.password;
cb(null, res);
}).catch((e) => cb(e));
})
.catch(cb);
}

/**
Expand All @@ -56,12 +54,8 @@ function post(event, cb) {
.catch((e) => {
if (e instanceof RecordDoesNotExist) {
return model.create(data)
.then((r) => {
data = r;
return Search.es();
}).then(esClient => indexRule(esClient, data))
.then(() => cb(null, { message: 'Record saved', record: data }))
.catch(err => cb(err));
.then(r => cb(null, { message: 'Record saved', record: r }))
.catch(cb);
}
return cb(e);
});
Expand Down Expand Up @@ -110,9 +104,7 @@ async function put(event) {
return;
}

data = await model.update(originalData, data);
const esClient = await Search.es();
await indexRule(esClient, data);
return model.update(originalData, data);
}

async function del(event) {
Expand All @@ -121,15 +113,12 @@ async function del(event) {

name = name.replace(/%20/g, ' ');

const record = await model.get({ name });
await model.delete(record);
const esClient = await Search.es();
await deleteRecord(esClient, name, 'rule');
await model.get({ name }).then(record => model.delete(record));
return { message: 'Record deleted' };
}

function handler(event, context) {
handle(event, context, true, (cb) => {
return handle(event, context, !process.env.TEST /* authCheck */, cb => {
if (event.httpMethod === 'GET' && event.pathParameters) {
get(event, cb);
}
Expand All @@ -149,12 +138,3 @@ function handler(event, context) {
}

module.exports = handler;


justLocalRun(() => {
//put({ pathParameters: { name: 'discover_aster' }, body: '{"action":"rerun"}' }).then(r => console.log(r)).catch(e => console.log(e)); // eslint-disable-line max-len
//handler(postPayload, {
//succeed: r => console.log(r),
//failed: e => console.log(e)
//}, (e, r) => console.log(e, r));
});
1 change: 1 addition & 0 deletions packages/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ exports.schemas = require('./endpoints/schemas');
exports.stats = require('./endpoints/stats');
exports.version = require('./endpoints/version');
exports.distribution = require('./endpoints/distribution');
exports.dbIndexer = require('./lambdas/db-indexer');

exports.jobs = require('./lambdas/jobs');
exports.bootstrap = require('./lambdas/bootstrap').handler;
Expand Down
Loading

0 comments on commit da2df8a

Please sign in to comment.