diff --git a/CHANGELOG.md b/CHANGELOG.md index 00ded0d02e1..fba62c472e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Fixed + +- **CUMULUS-175: "Dashboard providers not in sync with AWS providers."** The root cause of this bug - DynamoDB operations not showing up in Elasticsearch - was shared by collections and rules. The fix was to update providers', collections' and rules; POST, PUT and DELETE endpoints to operate on DynamoDB and using DynamoDB streams to update Elasticsearch. The following packages were made: + - `@cumulus/deployment` deploys DynamoDB streams for the Collections, Providers and Rules tables as well as a new lambda function called `dbIndexer`. The `dbIndexer` lambda has an event source mapping which listens to each of the DynamoDB streams. The dbIndexer lambda receives events referencing operations on the DynamoDB table and updates the elasticsearch cluster accordingly. + - The `@cumulus/api` endpoints for collections, providers and rules _only_ query DynamoDB, with the exception of LIST endpoints and the collections' GET endpoint. + ### Updated - Broke up `kes.override.js` of @cumulus/deployment to multiple modules and moved to a new location - Expanded @cumulus/deployment test coverage diff --git a/packages/api/README.md b/packages/api/README.md index 38892e10813..ac8e2ae72f0 100644 --- a/packages/api/README.md +++ b/packages/api/README.md @@ -29,11 +29,12 @@ See [Cumulus README](https://github.com/cumulus-nasa/cumulus/blob/master/README. Running tests for kinesis-consumer depends on localstack. Once you have installed localstack, you can start it for dynamoDB only: ``` -SERVICES=dynamodb localstack start +LAMBDA_EXECUTOR=docker localstack start ``` Then you can run tests locally via: ```bash -LOCALSTACK_HOST=localhost npm run test +export DOCKERHOST=$(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{ print $2 }' | cut -f2 -d: | head -n1) +LOCALSTACK_HOST=localhost DOCKERHOST=${DOCKERHOST} IS_LOCAL=true npm run test ``` diff --git a/packages/api/config/lambdas.yml b/packages/api/config/lambdas.yml index 581a51d9977..24b8afaddfa 100644 --- a/packages/api/config/lambdas.yml +++ b/packages/api/config/lambdas.yml @@ -40,6 +40,18 @@ sf2snsEnd: memory: 256 source: 'node_modules/@cumulus/api/dist/' +dbIndexer: + handler: index.dbIndexer + timeout: 300 + memory: 512 + source: 'node_modules/@cumulus/api/dist/' + envs: + ES_HOST: + function: "Fn::GetAtt" + array: + - '{{es.name}}Domain' + - DomainEndpoint + kinesisConsumer: handler: index.kinesisConsumer timeout: 100 diff --git a/packages/api/endpoints/collections.js b/packages/api/endpoints/collections.js index 735ead03dd4..ddd8f3b7bcc 100644 --- a/packages/api/endpoints/collections.js +++ b/packages/api/endpoints/collections.js @@ -8,7 +8,6 @@ const models = require('../models'); const Collection = require('../es/collections'); const RecordDoesNotExist = require('../lib/errors').RecordDoesNotExist; const examplePayload = require('../tests/data/collections_post.json'); -const { indexCollection, deleteRecord } = require('../es/indexer'); /** * List all collections. @@ -18,9 +17,7 @@ const { indexCollection, deleteRecord } = require('../es/indexer'); */ function list(event, cb) { const collection = new Collection(event); - collection.query().then(res => cb(null, res)).catch((e) => { - cb(e); - }); + collection.query().then(res => cb(null, res)).catch(cb); } /** @@ -37,8 +34,9 @@ function get(event, cb) { .then((res) => { const collection = new Collection(event); return collection.getStats([res], [res.name]); - }).then(res => cb(null, res[0])) - .catch((e) => cb(e)); + }) + .then(res => cb(null, res[0])) + .catch(cb); } /** @@ -63,10 +61,8 @@ function post(event, cb) { .catch((e) => { if (e instanceof RecordDoesNotExist) { return c.create(data) - .then(() => Collection.es()) - .then(esClient => indexCollection(esClient, data)) .then(() => cb(null, { message: 'Record saved', record: data })) - .catch(err => cb(err)); + .catch(cb); } return cb(e); }); @@ -94,11 +90,11 @@ function put(event, cb) { const c = new models.Collection(); // get the record first - return c.get({ name, version }).then((originalData) => { - data = Object.assign({}, originalData, data); - return c.create(data); - }).then(() => Collection.es()) - .then(esClient => indexCollection(esClient, data)) + return c.get({ name, version }) + .then((originalData) => { + data = Object.assign({}, originalData, data); + return c.create(data); + }) .then(() => cb(null, data)) .catch((err) => { if (err instanceof RecordDoesNotExist) { @@ -111,24 +107,22 @@ function put(event, cb) { function del(event, cb) { const name = _get(event.pathParameters, 'collectionName'); const version = _get(event.pathParameters, 'version'); - const id = `${name}___${version}`; const c = new models.Collection(); return c.get({ name, version }) .then(() => c.delete({ name, version })) - .then(() => Collection.es()) - .then((esClient) => deleteRecord(esClient, id, 'collection')) .then(() => cb(null, { message: 'Record deleted' })) - .catch(e => cb(e)); + .catch(cb); } function handler(event, context) { const httpMethod = _get(event, 'httpMethod'); + if (!httpMethod) { return context.fail('HttpMethod is missing'); } - return handle(event, context, true, (cb) => { + return handle(event, context, !process.env.TEST /* authCheck */, cb => { if (event.httpMethod === 'GET' && event.pathParameters) { get(event, cb); } diff --git a/packages/api/endpoints/providers.js b/packages/api/endpoints/providers.js index 8d7d2d2325a..0fa506e4e8d 100644 --- a/packages/api/endpoints/providers.js +++ b/packages/api/endpoints/providers.js @@ -2,11 +2,10 @@ 'use strict'; const _get = require('lodash.get'); -const handle = require('../lib/response').handle; +const { handle } = require('../lib/response'); const models = require('../models'); -const Search = require('../es/search').Search; -const { deleteRecord, indexProvider } = require('../es/indexer'); const RecordDoesNotExist = require('../lib/errors').RecordDoesNotExist; +const { Search } = require('../es/search'); /** * List all providers. @@ -16,9 +15,7 @@ const RecordDoesNotExist = require('../lib/errors').RecordDoesNotExist; */ function list(event, cb) { const search = new Search(event, 'provider'); - search.query().then(response => cb(null, response)).catch((e) => { - cb(e); - }); + search.query().then(response => cb(null, response)).catch(cb); } /** @@ -38,7 +35,8 @@ function get(event, cb) { .then((res) => { delete res.password; cb(null, res); - }).catch((e) => cb(e)); + }) + .catch(cb); } /** @@ -58,12 +56,8 @@ function post(event, cb) { .catch((e) => { if (e instanceof RecordDoesNotExist) { return p.create(data) - .then((r) => { - data = r; - return Search.es(); - }).then(esClient => indexProvider(esClient, data)) - .then(() => cb(null, { message: 'Record saved', record: data })) - .catch(err => cb(err)); + .then(data => cb(null, { message: 'Record saved', record: data })) + .catch(err => cb(err)); } return cb(e); }); @@ -91,17 +85,12 @@ function put(event, cb) { return p.get({ id }).then((d) => { originalData = d; return p.update({ id }, data); - }).then(() => { - data = Object.assign({}, originalData, data); - return Search.es(); - }).then(esClient => indexProvider(esClient, data)) - .then(() => cb(null, data)) - .catch((err) => { - if (err instanceof RecordDoesNotExist) { - return cb({ message: 'Record does not exist' }); - } - return cb(err); - }); + }) + .then(data => cb(null, data)) + .catch((err) => { + if (err instanceof RecordDoesNotExist) cb({ message: 'Record does not exist' }); + return cb(err); + }); } function del(event, cb) { @@ -110,14 +99,12 @@ function del(event, cb) { return p.get({ id }) .then(() => p.delete({ id })) - .then(() => Search.es()) - .then((esClient) => deleteRecord(esClient, id, 'provider')) .then(() => cb(null, { message: 'Record deleted' })) - .catch(e => cb(e)); + .catch(cb); } function handler(event, context) { - handle(event, context, true, (cb) => { + handle(event, context, !process.env.TEST /* authCheck */, (cb) => { if (event.httpMethod === 'GET' && event.pathParameters) { get(event, cb); } diff --git a/packages/api/endpoints/rules.js b/packages/api/endpoints/rules.js index fe08ff1dd85..5c78f005d02 100644 --- a/packages/api/endpoints/rules.js +++ b/packages/api/endpoints/rules.js @@ -5,9 +5,8 @@ const _get = require('lodash.get'); const { justLocalRun } = require('@cumulus/common/local-helpers'); const { handle } = require('../lib/response'); const models = require('../models'); -const { Search } = require('../es/search'); -const { deleteRecord, indexRule } = require('../es/indexer'); const { RecordDoesNotExist } = require('../lib/errors'); +const { Search } = require('../es/search'); /** * List all providers. @@ -17,9 +16,7 @@ const { RecordDoesNotExist } = require('../lib/errors'); */ function list(event, cb) { const search = new Search(event, 'rule'); - search.query().then(response => cb(null, response)).catch((e) => { - cb(e); - }); + search.query().then(response => cb(null, response)).catch(cb); } /** @@ -36,7 +33,8 @@ function get(event, cb) { .then((res) => { delete res.password; cb(null, res); - }).catch((e) => cb(e)); + }) + .catch(cb); } /** @@ -56,12 +54,8 @@ function post(event, cb) { .catch((e) => { if (e instanceof RecordDoesNotExist) { return model.create(data) - .then((r) => { - data = r; - return Search.es(); - }).then(esClient => indexRule(esClient, data)) - .then(() => cb(null, { message: 'Record saved', record: data })) - .catch(err => cb(err)); + .then(r => cb(null, { message: 'Record saved', record: r })) + .catch(cb); } return cb(e); }); @@ -110,9 +104,7 @@ async function put(event) { return; } - data = await model.update(originalData, data); - const esClient = await Search.es(); - await indexRule(esClient, data); + return model.update(originalData, data); } async function del(event) { @@ -121,15 +113,12 @@ async function del(event) { name = name.replace(/%20/g, ' '); - const record = await model.get({ name }); - await model.delete(record); - const esClient = await Search.es(); - await deleteRecord(esClient, name, 'rule'); + await model.get({ name }).then(record => model.delete(record)); return { message: 'Record deleted' }; } function handler(event, context) { - handle(event, context, true, (cb) => { + return handle(event, context, !process.env.TEST /* authCheck */, cb => { if (event.httpMethod === 'GET' && event.pathParameters) { get(event, cb); } @@ -149,12 +138,3 @@ function handler(event, context) { } module.exports = handler; - - -justLocalRun(() => { - //put({ pathParameters: { name: 'discover_aster' }, body: '{"action":"rerun"}' }).then(r => console.log(r)).catch(e => console.log(e)); // eslint-disable-line max-len - //handler(postPayload, { - //succeed: r => console.log(r), - //failed: e => console.log(e) - //}, (e, r) => console.log(e, r)); -}); diff --git a/packages/api/index.js b/packages/api/index.js index 5ce8e4579e8..44e3aba916b 100644 --- a/packages/api/index.js +++ b/packages/api/index.js @@ -14,6 +14,7 @@ exports.schemas = require('./endpoints/schemas'); exports.stats = require('./endpoints/stats'); exports.version = require('./endpoints/version'); exports.distribution = require('./endpoints/distribution'); +exports.dbIndexer = require('./lambdas/db-indexer'); exports.jobs = require('./lambdas/jobs'); exports.bootstrap = require('./lambdas/bootstrap').handler; diff --git a/packages/api/lambdas/db-indexer.js b/packages/api/lambdas/db-indexer.js new file mode 100644 index 00000000000..1ea58dddff3 --- /dev/null +++ b/packages/api/lambdas/db-indexer.js @@ -0,0 +1,82 @@ +'use strict'; + +const get = require('lodash.get'); +const pLimit = require('p-limit'); +const { AttributeValue } = require('dynamodb-data-types'); +const indexer = require('../es/indexer'); +const { Search } = require('../es/search'); +const unwrap = AttributeValue.unwrap; + +function indexRecord(esClient, record) { + // only process if the source is dynamoDB + if (record.eventSource !== 'aws:dynamodb') { + return Promise.resolve(); + } + + const stack = process.env.stackName; + + //determine whether the record should be indexed + const acceptedTables = ['Collection', 'Provider', 'Rule']; + const tableConfig = {} + acceptedTables.forEach((a) => { + tableConfig[`${stack}-${a}sTable`] = indexer[`index${a}`]; + }); + + let tableName = record.eventSourceARN.match(/table\/(.[^\/]*)/); + + const tableIndex = Object.keys(tableConfig).indexOf(tableName[1]); + if (!tableName || (tableName && tableIndex === -1)) { + return Promise.resolve(); + } + tableName = tableName[1]; + const currentTable = acceptedTables[tableIndex].toLowerCase(); + + // now get the hash and range (if any) and use them as id key for ES + const keys = unwrap(get(record, 'dynamodb.Keys')); + const body = unwrap(get(record, 'dynamodb.NewImage')); + const data = Object.assign({}, keys, body); + + if (record.eventName === 'REMOVE') { + let id; + const idKeys = Object.keys(keys); + if (idKeys.length > 1) { + id = indexer.constructCollectionId(...Object.values(keys)); + } + else { + id = keys[idKeys[0]]; + } + return indexer + .deleteRecord(esClient, id, currentTable) + // Important to catch this error. Uncaught errors will cause the handler to fail and other records will not be updated. + .catch(console.log); + } + return tableConfig[tableName](esClient, data); +} + +async function indexRecords(records) { + const concurrencyLimit = process.env.CONCURRENCY || 3 + const limit = pLimit(concurrencyLimit); + const esClient = await Search.es(); + + const promises = records.map((record) => limit( + () => indexRecord(esClient, record) + )); + return Promise.all(promises); +} + +/** + * Sync changes to dynamodb to an elasticsearch instance. + * Sending updates to this lambda is handled by automatically AWS. + * @param {array} Records list of records with an eventName property signifying REMOVE or INSERT. + * @return {string} response text indicating the number of records altered in elasticsearch. + */ +function handler(event, context, cb) { + const records = event.Records; + if (!records) { + return cb(null, 'No records found in event'); + } + + return indexRecords(records).then((r) => cb(null, r)).catch(cb); +} + +module.exports = handler; diff --git a/packages/api/models/base.js b/packages/api/models/base.js index 700ba4804fd..29f9ab1eb16 100644 --- a/packages/api/models/base.js +++ b/packages/api/models/base.js @@ -44,6 +44,10 @@ class Manager { ProvisionedThroughput: { ReadCapacityUnits: 5, WriteCapacityUnits: 5 + }, + StreamSpecification: { + StreamEnabled: true, + StreamViewType: 'NEW_IMAGE' } }; @@ -168,7 +172,7 @@ class Manager { Item: item }; - await this.dynamodbDocClient.put(params).promise(); + await aws.dynamodbDocClient().put(params).promise(); }; if (items instanceof Array) { diff --git a/packages/api/package.json b/packages/api/package.json index 909ef5b0fa3..9feedc1cae6 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -4,7 +4,7 @@ "description": "Lambda functions for handling all daac's API operations", "main": "index.js", "scripts": { - "test": "IS_LOCAL=true TEST=true ava tests/test-*.js --serial", + "test": "TEST=true ava tests/test-*.js --serial", "build": "webpack --progress", "watch": "webpack --progress -w", "postinstall": "npm run build" @@ -37,12 +37,14 @@ "@cumulus/ingest": "^1.1.1", "@cumulus/pvl": "^1.0.0", "ajv": "^5.2.2", + "archiver": "^2.1.1", "aws-sdk": "^2.95.0", "babel-core": "^6.25.0", "babel-loader": "^6.2.4", "babel-polyfill": "^6.23.0", "babel-preset-es2017": "^6.24.1", "basic-auth": "^1.1.0", + "dynamodb-data-types": "^3.0.0", "elasticsearch": "^13.2.0", "form-data": "^2.1.2", "got": "^6.7.1", @@ -60,7 +62,7 @@ "p-limit": "^1.1.0", "querystring": "^0.2.0", "uuid": "^3.2.1", - "webpack": "^1.12.13" + "webpack": "^1.15.0" }, "devDependencies": { "ava": "^0.21.0", diff --git a/packages/api/tests/test-db-indexer.js b/packages/api/tests/test-db-indexer.js new file mode 100644 index 00000000000..60306c98221 --- /dev/null +++ b/packages/api/tests/test-db-indexer.js @@ -0,0 +1,151 @@ +'use strict'; + +const archiver = require('archiver'); +const aws = require('@cumulus/common/aws'); +const fs = require('fs'); +const path = require('path'); +const { randomString } = require('@cumulus/common/test-utils'); +const test = require('ava'); + +process.env.stackName = 'test-stack'; +process.env.internal = 'test-bucket'; +process.env.CollectionsTable = `${process.env.stackName}-CollectionsTable`; + +const bootstrap = require('../lambdas/bootstrap'); +const models = require('../models'); +const collections = new models.Collection(); +const EsCollection = require('../es/collections'); + +const testCollection = { + 'name': `collection-${randomString()}`, + 'version': '0.0.0', + 'provider_path': '/', + 'duplicateHandling': 'replace', + 'granuleId': '^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$', + 'granuleIdExtraction': '(MOD09GQ\\.(.*))\\.hdf', + 'sampleFileName': 'MOD09GQ.A2017025.h21v00.006.2017034065104.hdf', + 'files': [] +}; + +const collectionOnlyInDynamo = Object.assign({}, testCollection, { name: `collection-${randomString()}` }); + +const codeDirectory = 'dist/' +const tmpZipFile = path.join('/tmp/test.zip'); +const output = fs.createWriteStream(tmpZipFile) +const archive = archiver('zip', { + zlib: { level: 9 } +}); +const dbIndexerFnName = 'test-dbIndexer'; +const hash = { name: 'name', type: 'S' }; +const range = { name: 'version', type: 'S' }; + +/** + * TODO(aimee): This test works when running tests just for @cumulus/api, but not on all tests or CI. + * Running localstack on CI for this test requires: + * - built packages/api/dist/index.js (packages are not built for circle ci). This is fixable. + * - A docker executor for lambdas, which is done in part by LAMBDA_EXECUTOR: docker as an env variable to the localstack/localstack docker image for ci + * But it still appears docker isn't running: `Cannot connect to the Docker daemon at unix:///var/run/docker.sock` +**/ +if (process.env.LOCALSTACK_HOST === 'localhost') { + // Test that if our dynamos are hooked up to the db-indexer lambda function, + // records show up in elasticsearch 'hooked-up': the dynamo has a stream and the + // lambda has an event source mapping to that dynamo stream. + test.skip.before(async () => { + await aws.s3().createBucket({ Bucket: process.env.internal }).promise(); + + // create collections table + await models.Manager.createTable(process.env.CollectionsTable, hash, range); + // create an object only in dynamo to test error condition + await collections.create(collectionOnlyInDynamo); + await bootstrap.bootstrapElasticSearch('http://localhost:4571'); + + // create the lambda function + await new Promise((resolve) => { + output.on('close', () => { + const contents = fs.readFileSync(tmpZipFile) + + aws.lambda().createFunction({ + FunctionName: dbIndexerFnName, + Runtime: 'nodejs6.10', + Handler: 'index.dbIndexer', // point to the db indexer + Role: 'testRole', + Code: { + ZipFile: contents + }, + Environment: { + Variables: { + 'TEST': 'true', + 'LOCALSTACK_HOST': process.env.DOCKERHOST, + 'stackName': process.env.stackName + } + } + }) + .promise() + .then((res) => { + fs.unlinkSync(tmpZipFile); + resolve(res); + }); + }); + + archive.pipe(output) + archive.directory(codeDirectory, false); + archive.finalize() + }) + .catch(console.log); + + //get the dynamo collections table stream arn and add it as an event source to the lambda + await new Promise((resolve, reject) => { + aws.dynamodbstreams().listStreams({TableName: process.env.CollectionsTable}, (err, data) => { + if (err) reject(err); + const collectionsTableStreamArn = data.Streams.find(s => s.TableName === 'test-stack-CollectionsTable').StreamArn; + const eventSourceMappingParams = { + EventSourceArn: collectionsTableStreamArn, + FunctionName: dbIndexerFnName, + StartingPosition: 'TRIM_HORIZON', + BatchSize: 10 + }; + + aws.lambda().createEventSourceMapping(eventSourceMappingParams, (err, data) => { + if (err) reject(err); + resolve(data); + }); + }); + }) + .catch(console.log); + }); + + test.skip.after.always(async () => { + await models.Manager.deleteTable(process.env.CollectionsTable); + await aws.lambda().deleteFunction({FunctionName: dbIndexerFnName}).promise(); + await aws.recursivelyDeleteS3Bucket(process.env.internal); + }); + + test.skip('creates a collection in dynamodb and es', async t => { + const { name } = testCollection; + await collections.create(testCollection) + .then(() => { + const esCollection = new EsCollection({}); + return esCollection.query(); + }) + .then((result) => { + t.is(result.results[0].name, testCollection.name); + t.is(result.results[0].version, testCollection.version); + }) + .then(() => collections.delete({ name })) + .catch(console.log); + }); + + test.skip('thrown error is caught', async t => { + const { name } = collectionOnlyInDynamo; + await collections.delete({ name }) + .then((result) => { + t.is(result.results[0].name, testCollection.name); + t.is(result.results[0].version, testCollection.version); + }) + .catch(console.log); + }); +} else { + test('db-indexer TODO test', t => { + t.is(1+1, 2); + }); +} diff --git a/packages/api/tests/test-endpoints-collections.js b/packages/api/tests/test-endpoints-collections.js new file mode 100755 index 00000000000..49604510ccf --- /dev/null +++ b/packages/api/tests/test-endpoints-collections.js @@ -0,0 +1,123 @@ +'use strict'; + +const sinon = require('sinon'); +const test = require('ava'); + +process.env.CollectionsTable = 'Test_CollectionsTable'; +process.env.stackName = 'test-stack'; +process.env.internal = 'test-bucket'; + +const models = require('../models'); +const aws = require('@cumulus/common/aws'); +const bootstrap = require('../lambdas/bootstrap'); +const collectionsEndpoint = require('../endpoints/collections'); +const collections = new models.Collection(); +const EsCollection = require('../es/collections'); +const { testEndpoint } = require('./testUtils'); + +const testCollection = { + 'name': 'collection-125', + 'version': '0.0.0', + 'provider_path': '/', + 'duplicateHandling': 'replace', + 'granuleId': '^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$', + 'granuleIdExtraction': '(MOD09GQ\\.(.*))\\.hdf', + 'sampleFileName': 'MOD09GQ.A2017025.h21v00.006.2017034065104.hdf', + 'files': [] +}; + +const hash = { name: 'name', type: 'S' }; +const range = { name: 'version', type: 'S' }; + +async function setup() { + await bootstrap.bootstrapElasticSearch('http://localhost:4571'); + sinon.stub(EsCollection.prototype, 'getStats').returns([testCollection]); + await aws.s3().createBucket({ Bucket: process.env.internal }).promise(); + await models.Manager.createTable(process.env.CollectionsTable, hash, range); + await collections.create(testCollection); +} + +async function teardown() { + models.Manager.deleteTable(process.env.CollectionsTable); + await aws.recursivelyDeleteS3Bucket(process.env.internal); +} + +test.before(async () => setup()); +test.after.always(async () => teardown()); + +// TODO(aimee): Debug why this is _passing_ - we don't expect to already have a +// collection in ES. +test('default returns list of collections', t => { + const listEvent = { httpMethod: 'list' }; + return testEndpoint(collectionsEndpoint, listEvent, (response) => { + const { results } = JSON.parse(response.body); + t.is(results.length, 1); + }); +}); + +test('GET returns an existing collection', t => { + const getEvent = { + httpMethod: 'GET', + pathParameters: { + collectionName: testCollection.name, + version: testCollection.version + } + }; + return testEndpoint(collectionsEndpoint, getEvent, (response) => { + const { name } = JSON.parse(response.body); + t.is(name, testCollection.name); + }); +}); + +test('POST creates a new collection', t => { + const newCollection = Object.assign({}, testCollection, {name: 'collection-post'}); + const postEvent = { + httpMethod: 'POST', + body: JSON.stringify(newCollection) + }; + return testEndpoint(collectionsEndpoint, postEvent, (response) => { + const { message, record } = JSON.parse(response.body); + t.is(message, 'Record saved'); + t.is(record.name, newCollection.name); + }); +}); + +test('PUT updates an existing collection', t => { + const newPath = '/new_path'; + const updateEvent = { + body: JSON.stringify({ + name: testCollection.name, + version: testCollection.version, + provider_path: newPath + }), + pathParameters: { + collectionName: testCollection.name, + version: testCollection.version, + }, + httpMethod: 'PUT' + }; + return testEndpoint(collectionsEndpoint, updateEvent, (response) => { + const { provider_path } = JSON.parse(response.body); + t.is(provider_path, newPath); + }); +}); + +test('DELETE deletes an existing collection', t => { + const deleteEvent = { + httpMethod: 'DELETE', + pathParameters: { + collectionName: testCollection.name, + version: testCollection.version, + } + }; + return testEndpoint(collectionsEndpoint, deleteEvent, (response) => { + const { message } = JSON.parse(response.body); + t.is(message, 'Record deleted'); + }); +}); + +test.todo('GET returns existing collection'); +test.todo('POST without name and version returns error message'); +test.todo('PUT with invalid name and version returns error message'); +// Multiple tests +test.todo('Test methods return not found'); diff --git a/packages/api/tests/test-endpoints-providers.js b/packages/api/tests/test-endpoints-providers.js new file mode 100644 index 00000000000..a404199a773 --- /dev/null +++ b/packages/api/tests/test-endpoints-providers.js @@ -0,0 +1,95 @@ +'use strict'; + +const aws = require('@cumulus/common/aws'); +const test = require('ava'); + +process.env.ProvidersTable = 'Test_ProviderTable'; +process.env.stackName = 'test-stack'; +process.env.internal = 'test-bucket'; + +const bootstrap = require('../lambdas/bootstrap'); +const models = require('../models'); +const providerEndpoint = require('../endpoints/providers'); +const { testEndpoint } = require('./testUtils'); +const providers = new models.Provider(); + +const testProvider = { + id: 'orbiting-carbon-observatory-2', + globalConnectionLimit: 1, + protocol: 'http', + host: 'https://oco.jpl.nasa.gov/', + port: 80 +}; +const keyId = 'public.pub'; + +const hash = { name: 'id', type: 'S' }; + +async function setup() { + await bootstrap.bootstrapElasticSearch('http://localhost:4571'); + await models.Manager.createTable(process.env.ProvidersTable, hash); + await providers.create(testProvider); +} + +async function teardown() { + await models.Manager.deleteTable(process.env.ProvidersTable); +} + +test.before(async () => setup()); +test.after.always(async () => teardown()); + +// TODO(aimee): Add a provider to ES. List uses ES and we don't have any providers in ES. +test('default returns list of providers', t => { + const listEvent = { httpMethod: 'list' }; + return testEndpoint(providerEndpoint, listEvent, (response) => { + const { results } = JSON.parse(response.body); + t.is(results.length, 0); + }); +}); + +test('GET returns an existing provider', t => { + const getEvent = { + httpMethod: 'GET', + pathParameters: { id: testProvider.id } + }; + return testEndpoint(providerEndpoint, getEvent, (response) => { + t.is(JSON.parse(response.body).id, testProvider.id); + }); +}); + +test('POST creates a new provider', t => { + const newProviderId = 'AQUA'; + const newProvider = Object.assign({}, testProvider, { id: newProviderId }); + const postEvent = { + httpMethod: 'POST', + body: JSON.stringify(newProvider) + }; + return testEndpoint(providerEndpoint, postEvent, (response) => { + const { message, record } = JSON.parse(response.body); + t.is(message, 'Record saved'); + t.is(record.id, newProviderId); + }); +}); + +test('PUT updates an existing provider', t => { + const updatedLimit = 2; + const putEvent = { + httpMethod: 'PUT', + pathParameters: { id: testProvider.id }, + body: JSON.stringify({ globalConnectionLimit: updatedLimit }) + }; + return testEndpoint(providerEndpoint, putEvent, (response) => { + const { globalConnectionLimit } = JSON.parse(response.body); + t.is(globalConnectionLimit, updatedLimit); + }); +}); + +test('DELETE deletes an existing provider', t => { + const deleteEvent = { + httpMethod: 'DELETE', + pathParameters: { id: testProvider.id } + }; + return testEndpoint(providerEndpoint, deleteEvent, (response) => { + const { message } = JSON.parse(response.body); + t.is(message, 'Record deleted'); + }); +}); diff --git a/packages/api/tests/test-endpoints-rules.js b/packages/api/tests/test-endpoints-rules.js new file mode 100644 index 00000000000..21d7a849cf5 --- /dev/null +++ b/packages/api/tests/test-endpoints-rules.js @@ -0,0 +1,117 @@ +'use strict'; + +const test = require('ava'); + +process.env.RulesTable = 'Test_RulesTable'; +process.env.stackName = 'test-stack'; +process.env.bucket = 'test-bucket'; +const workflowName = 'morning-routine'; +const workflowfile = `${process.env.stackName}/workflows/${workflowName}.json`; + +const aws = require('@cumulus/common/aws'); +const bootstrap = require('../lambdas/bootstrap'); +const models = require('../models'); +const rulesEndpoint = require('../endpoints/rules'); +const { testEndpoint } = require('./testUtils'); + +const rules = new models.Rule(); + +const testRule = { + name: 'make_coffee', + workflow: workflowName, + provider: 'whole-foods', + collection: { + name: 'compass', + version: '0.0.0' + }, + rule: { + type: 'onetime' + }, + state: 'DISABLED' +}; + +const hash = { name: 'name', type: 'S' }; +async function setup() { + await bootstrap.bootstrapElasticSearch('http://localhost:4571'); + await aws.s3().createBucket({ Bucket: process.env.bucket }).promise(); + await aws.s3().putObject({ + Bucket: process.env.bucket, + Key: workflowfile, + Body: 'test data' + }).promise(); + await models.Manager.createTable(process.env.RulesTable, hash); + await rules.create(testRule); +} + +async function teardown() { + models.Manager.deleteTable(process.env.RulesTable); + await aws.recursivelyDeleteS3Bucket(process.env.bucket); +} + +test.before(async () => setup()); +test.after.always(async () => teardown()); + +// TODO(aimee): Add a rule to ES. List uses ES and we don't have any rules in ES. +test('default returns list of rules', t => { + const listEvent = { httpMethod: 'list ' }; + return testEndpoint(rulesEndpoint, listEvent, (response) => { + const { results } = JSON.parse(response.body); + t.is(results.length, 0); + }); +}); + +test('GET gets a rule', t => { + const getEvent = { + pathParameters: { + name: testRule.name + }, + httpMethod: 'GET' + }; + return testEndpoint(rulesEndpoint, getEvent, (response) => { + const { name } = JSON.parse(response.body); + t.is(name, testRule.name); + }); +}); + +test('POST creates a rule', t => { + const newRule = Object.assign({}, testRule, {name: 'make_waffles'}); + const postEvent = { + httpMethod: 'POST', + body: JSON.stringify(newRule) + }; + return testEndpoint(rulesEndpoint, postEvent, (response) => { + const { message, record } = JSON.parse(response.body); + t.is(message, 'Record saved'); + t.is(record.name, newRule.name); + }); +}); + +test('PUT updates a rule', t => { + const updateEvent = { + body: JSON.stringify({state: 'ENABLED'}), + pathParameters: { + name: testRule.name + }, + httpMethod: 'PUT' + }; + return testEndpoint(rulesEndpoint, updateEvent, (response) => { + const { state } = JSON.parse(response.body); + t.is(state, 'ENABLED'); + }); +}); + +test('DELETE deletes a rule', t => { + const deleteEvent = { + pathParameters: { + name: testRule.name + }, + httpMethod: 'DELETE' + }; + return testEndpoint(rulesEndpoint, deleteEvent, (response) => { + const { message } = JSON.parse(response.body); + t.is(message, 'Record deleted'); + }); +}); + +test.todo('POST returns a record exists when one exists'); + diff --git a/packages/api/tests/testUtils.js b/packages/api/tests/testUtils.js new file mode 100644 index 00000000000..17b1dfb2810 --- /dev/null +++ b/packages/api/tests/testUtils.js @@ -0,0 +1,12 @@ +'use strict'; + +function testEndpoint(endpoint, event, testCallback) { + return new Promise((resolve, reject) => { + endpoint(event, { + succeed: response => resolve(testCallback(response)), + fail: e => reject(e) + }); + }); +} + +module.exports = { testEndpoint }; diff --git a/packages/common/aws.js b/packages/common/aws.js index cf2fe00ef44..5477bbd89c7 100644 --- a/packages/common/aws.js +++ b/packages/common/aws.js @@ -57,8 +57,10 @@ exports.ecs = awsClient(AWS.ECS, '2014-11-13'); exports.s3 = awsClient(AWS.S3, '2006-03-01'); exports.lambda = awsClient(AWS.Lambda, '2015-03-31'); exports.sqs = awsClient(AWS.SQS, '2012-11-05'); +exports.cloudwatchevents = awsClient(AWS.CloudWatchEvents, '2014-02-03'); exports.cloudwatchlogs = awsClient(AWS.CloudWatchLogs, '2014-03-28'); exports.dynamodb = awsClient(AWS.DynamoDB, '2012-08-10'); +exports.dynamodbstreams = awsClient(AWS.DynamoDBStreams, '2012-08-10'); exports.dynamodbDocClient = awsClient(AWS.DynamoDB.DocumentClient, '2012-08-10'); exports.sfn = awsClient(AWS.StepFunctions, '2016-11-23'); exports.cf = awsClient(AWS.CloudFormation, '2010-05-15'); diff --git a/packages/common/test-utils.js b/packages/common/test-utils.js index 66bcc5c125a..0aed764495a 100644 --- a/packages/common/test-utils.js +++ b/packages/common/test-utils.js @@ -20,6 +20,7 @@ const localStackPorts = { apigateway: 4567, cloudformation: 4581, cloudwatch: 4582, + cloudwatchevents: 4582, dynamodb: 4569, dynamodbstreams: 4570, es: 4571, diff --git a/packages/deployment/app/cloudformation.template.yml b/packages/deployment/app/cloudformation.template.yml index 0feca4dc61b..68e8c3700e4 100644 --- a/packages/deployment/app/cloudformation.template.yml +++ b/packages/deployment/app/cloudformation.template.yml @@ -40,6 +40,10 @@ Resources: Fn::GetAtt: - ScheduleSFLambdaFunction - Arn + dbIndexerLambdaFunction: + Fn::GetAtt: + - dbIndexerLambdaFunction + - Arn kinesisConsumerLambdaFunction: Fn::GetAtt: - kinesisConsumerLambdaFunction @@ -56,6 +60,12 @@ Resources: {{@key}}DynamoDB: Ref: {{@key}}DynamoDB {{/each}} + {{# each ../dynamos}} + {{@key}}DynamoDBStreamArn: + Fn::GetAtt: + - {{@key}}DynamoDB + - StreamArn + {{/each}} {{# if ../vpc }} SecurityGroupId: Fn::GetAtt: diff --git a/packages/deployment/app/cumulus_api.template.yml b/packages/deployment/app/cumulus_api.template.yml index be1deff6046..682ab3ef63e 100644 --- a/packages/deployment/app/cumulus_api.template.yml +++ b/packages/deployment/app/cumulus_api.template.yml @@ -19,6 +19,9 @@ Parameters: ScheduleSFLambdaFunction: Type: String Description: 'ScheduleSF lambda function arn' + dbIndexerLambdaFunction: + Type: String + Description: 'Arn for dbIndexer lambda function' kinesisConsumerLambdaFunction: Type: String Description: 'kinesisConsumer lambda function arn' @@ -35,6 +38,11 @@ Parameters: Type: String Description: '{{@key}} Table name' {{/each}} +{{# each parent.dynamos}} + {{@key}}DynamoDBStreamArn: + Type: String + Description: '{{@key}} Table Stream Arns' +{{/each}} Resources: @@ -239,9 +247,31 @@ Resources: Properties: LogGroupName: '/aws/lambda/{{../stackName}}-{{@key}}' RetentionInDays: 30 - {{/each}} ################################################# # Lambda config END ################################################# + +{{# if dynamo2ElasticSearch}} + ################################################# + # DynamoDB Event Source Mapping BEGIN + ################################################# + + {{#each dynamo2ElasticSearch.tables}} + {{this}}EventSourceMapping: + Type: AWS::Lambda::EventSourceMapping + Properties: + EventSourceArn: + Ref: {{this}}DynamoDBStreamArn + FunctionName: + Ref: {{../dynamo2ElasticSearch.lambda}}LambdaFunction + BatchSize: {{../dynamo2ElasticSearch.batchSize}} + StartingPosition: {{../dynamo2ElasticSearch.startingPosition}} + {{/each}} + + ################################################# + # DynamoDB Event Source Mapping END + ################################################# +{{/if}} + diff --git a/packages/deployment/app/cumulus_api_default.config.yml b/packages/deployment/app/cumulus_api_default.config.yml index 692f8a11e09..32f374c6fd7 100644 --- a/packages/deployment/app/cumulus_api_default.config.yml +++ b/packages/deployment/app/cumulus_api_default.config.yml @@ -1,6 +1,15 @@ default: apiDeploy: true + dynamo2ElasticSearch: + batchSize: 10 + startingPosition: TRIM_HORIZON + lambda: dbIndexer + tables: + - CollectionsTable + - RulesTable + - ProvidersTable + apis: - name: distribution - name: backend diff --git a/packages/ingest/aws.js b/packages/ingest/aws.js index aabacd8e67d..95affd51c15 100644 --- a/packages/ingest/aws.js +++ b/packages/ingest/aws.js @@ -67,7 +67,7 @@ function getExecutionUrl(executionArn) { } async function invoke(name, payload, type = 'Event') { - if (process.env.IS_LOCAL) { + if (process.env.IS_LOCAL || process.env.TEST) { log.info(`Faking Lambda invocation for ${name}`); return false; } @@ -98,7 +98,7 @@ function sqs(local) { class Events { static async putEvent(name, schedule, state, description = null, role = null) { - const cwevents = new AWS.CloudWatchEvents(); + const cwevents = new aws.cloudwatchevents(); const params = { Name: name, @@ -198,8 +198,6 @@ class S3 { } static async put(bucket, key, body, acl = 'private', meta = null) { - const s3 = new AWS.S3(); - const params = { Bucket: bucket, Key: key, @@ -211,7 +209,7 @@ class S3 { params.Metadata = meta; } - return s3.putObject(params).promise(); + return aws.s3().putObject(params).promise(); } static async get(bucket, key) {