diff --git a/.eslint-ratchet-high-water-mark b/.eslint-ratchet-high-water-mark index 767efdbde5b..23ca55f5b2a 100644 --- a/.eslint-ratchet-high-water-mark +++ b/.eslint-ratchet-high-water-mark @@ -1 +1 @@ -1306 +1145 diff --git a/.eslintrc.json b/.eslintrc.json index b63e3371ba3..6af5fea1cd2 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -51,16 +51,7 @@ "generator-star-spacing": "off", "import/no-extraneous-dependencies": "off", "import/newline-after-import": "off", - "no-warning-comments": [ - 2, - { - "terms": [ - "TODO", - "fixme" - ], - "location": "anywhere" - } - ], + "no-warning-comments": "off", "no-unused-vars": [ "error", { "argsIgnorePattern": "^_" } @@ -97,6 +88,7 @@ "ignorePattern": "(https?:|JSON\\.parse|[Uu]rl =)" } ], - "arrow-parens": ["error", "always"] + "arrow-parens": ["error", "always"], + "prefer-destructuring": "off" } } diff --git a/CHANGELOG.md b/CHANGELOG.md index e829075535d..7b4f91dea25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,35 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - fixed pdr-status-check schema, the failed execution contains arn and reason - **CUMULUS-206** make sure homepage and repository urls exist in package.json files of tasks and packages +### Changed +- [CUMULUS-450](https://bugs.earthdata.nasa.gov/browse/CUMULUS-450) - Updated + the config schema of the **queue-granules** task + - The config no longer takes a "collection" property + - The config now takes an "internalBucket" property + - The config now takes a "stackName" property +- [CUMULUS-450](https://bugs.earthdata.nasa.gov/browse/CUMULUS-450) - Updated + the config schema of the **parse-pdr** task + - The config no longer takes a "buckets" property + - The config no longer takes a "collection" property + - The config now takes an "internalBucket" property + - The "stack", "provider", and "internalBucket" config properties are now + required + +### Removed +- Removed the `findTmpTestDataDirectory()` function from + `@cumulus/common/test-utils` + +### Fixed +- [CUMULUS-450](https://bugs.earthdata.nasa.gov/browse/CUMULUS-450) + - The **queue-granules** task now enqueues a **sync-granule** task with the + correct collection config for that granule based on the granule's + data-type. It had previously been using the collection config from the + config of the **queue-granules** task, which was a problem if the granules + being queued belonged to different data-types. + - The **parse-pdr** task now handles the case where a PDR contains granules + with different data types, and uses the correct granuleIdExtraction for + each granule. + ### Added - **CUMULUS-448** Add code coverage checking using [nyc](https://github.com/istanbuljs/nyc). - **CUMULUS-469** Added a lambda to the API package to prototype creating an S3 bucket policy for direct, in-region S3 access for the prototype bucket diff --git a/packages/api/models/collections.js b/packages/api/models/collections.js index dade1517021..f4277520445 100644 --- a/packages/api/models/collections.js +++ b/packages/api/models/collections.js @@ -1,5 +1,6 @@ 'use strict'; +const { CollectionConfigStore } = require('@cumulus/common'); const { S3 } = require('@cumulus/ingest/aws'); const Manager = require('./base'); const collectionSchema = require('./schemas').collection; @@ -44,9 +45,9 @@ class Collection extends Manager { } async create(item) { - // write the record to S3 - const key = `${process.env.stackName}/collections/${item.name}.json`; - await S3.put(process.env.internal, key, JSON.stringify(item)); + const collectionConfigStore = + new CollectionConfigStore(process.env.internal, process.env.stackName); + await collectionConfigStore.put(item.name, item); return super.create(item); } diff --git a/packages/common/collection-config-store.js b/packages/common/collection-config-store.js new file mode 100644 index 00000000000..4c641e7f22b --- /dev/null +++ b/packages/common/collection-config-store.js @@ -0,0 +1,87 @@ +'use strict'; + +const { s3 } = require('./aws'); + +/** + * Store and retrieve collection configs in S3 + */ +class CollectionConfigStore { + /** + * Initialize a CollectionConfigFetcher instance + * + * @param {string} bucket - the bucket where collection configs are stored + * @param {string} stackName - the Cumulus deployment stack name + */ + constructor(bucket, stackName) { + this.bucket = bucket; + this.stackName = stackName; + this.cache = {}; + } + + /** + * Fetch a collection config from S3 (or cache if available) + * + * @param {string} dataType - the name of the collection config to fetch + * @returns {Object} the fetched collection config + */ + async get(dataType) { + // Check to see if the collection config has already been cached + if (!this.cache[dataType]) { + let response; + try { + // Attempt to fetch the collection config from S3 + response = await s3().getObject({ + Bucket: this.bucket, + Key: this.configKey(dataType) + }).promise(); + } + catch (err) { + if (err.code === 'NoSuchKey') { + throw new Error(`A collection config for data type "${dataType}" was not found.`); + } + + if (err.code === 'NoSuchBucket') { + throw new Error(`Collection config bucket does not exist: ${this.bucket}`); + } + + throw err; + } + + // Store the fetched collection config to the cache + this.cache[dataType] = JSON.parse(response.Body.toString()); + } + + return this.cache[dataType]; + } + + /** + * Store a collection config to S3 + * + * @param {string} dataType - the name of the collection config to store + * @param {Object} config - the collection config to store + * @returns {Promise} resolves when the collection config has been written + * to S3 + */ + async put(dataType, config) { + this.cache[dataType] = config; + + return s3().putObject({ + Bucket: this.bucket, + Key: `${this.stackName}/collections/${dataType}.json`, + Body: JSON.stringify(config) + }).promise().then(() => null); // Don't leak implementation details to the caller + } + + /** + * Return the S3 key pointing to the collection config + * + * @param {string} dataType - the datatype + * @returns {string} the S3 key where the collection config is located + * + * @private + */ + configKey(dataType) { + return `${this.stackName}/collections/${dataType}.json`; + } +} +module.exports = CollectionConfigStore; diff --git a/packages/common/index.js b/packages/common/index.js index ec945a04853..97f9214ab08 100644 --- a/packages/common/index.js +++ b/packages/common/index.js @@ -3,3 +3,4 @@ exports.log = require('./log'); exports.aws = require('./aws'); exports.task = require('./task'); +exports.CollectionConfigStore = require('./collection-config-store'); diff --git a/packages/common/package.json b/packages/common/package.json index 1087f4b4f4f..b6a5605e164 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -16,6 +16,7 @@ "url": "https://github.com/cumulus-nasa/cumulus" }, "scripts": { + "build": "true", "lint": "eslint .", "mocha": "mocha-webpack test/**/*.js", "mocha-coverage": "nyc mocha-webpack test/**/*.js", @@ -58,7 +59,7 @@ "ajv-cli": "^1.1.1", "async": "^2.0.0", "aws-sdk": "^2.4.11", - "babel-core": "^6.13.2", + "babel-core": "^6.26.0", "babel-eslint": "^8.2.2", "babel-polyfill": "^6.13.0", "babel-preset-env": "^1.6.1", @@ -87,7 +88,6 @@ "webpack-node-externals": "^1.5.4" }, "devDependencies": { - "ajv": "^5.5.2", "ava": "^0.25.0", "nyc": "^11.6.0" } diff --git a/packages/common/test-utils.js b/packages/common/test-utils.js index cab19fee7bf..7b12203d06d 100644 --- a/packages/common/test-utils.js +++ b/packages/common/test-utils.js @@ -177,17 +177,6 @@ async function findGitRepoRootDirectory(dirname) { } exports.findGitRepoRootDirectory = findGitRepoRootDirectory; -/** - * Determine the path of the .tmp-test-data directory - * - * @returns {Promise.} - the filesystem path of the .tmp-test-data directory - */ -function findTmpTestDataDirectory() { - return exports.findGitRepoRootDirectory(process.cwd()) - .then((gitRepoRoot) => path.join(gitRepoRoot, '.tmp-test-data')); -} -exports.findTmpTestDataDirectory = findTmpTestDataDirectory; - /** * Determine the path of the packages/test-data directory * diff --git a/packages/common/tests/.eslintrc.json b/packages/common/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/packages/common/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/packages/common/tests/collection-config-store.js b/packages/common/tests/collection-config-store.js new file mode 100644 index 00000000000..420e29b1b14 --- /dev/null +++ b/packages/common/tests/collection-config-store.js @@ -0,0 +1,115 @@ +'use strict'; + +const test = require('ava'); +const { recursivelyDeleteS3Bucket, s3 } = require('../aws'); +const { randomString } = require('../test-utils'); +const CollectionConfigStore = require('../collection-config-store'); + +test.beforeEach(async (t) => { + t.context.stackName = randomString(); + t.context.dataType = randomString(); + t.context.collectionConfig = { name: randomString() }; + + t.context.bucket = randomString(); + await s3().createBucket({ Bucket: t.context.bucket }).promise(); + + // Utility function to return the S3 key of a collection config + t.context.collectionConfigKey = (dataType) => + `${t.context.stackName}/collections/${dataType}.json`; +}); + +test.afterEach(async (t) => { + try { + await recursivelyDeleteS3Bucket(t.context.bucket); + } + catch (err) { + // Some tests delete the bucket before this "afterEach" hook is run + if (err.code !== 'NoSuchBucket') throw err; + } +}); + +test('get() fetches a collection config from S3', async (t) => { + await s3().putObject({ + Bucket: t.context.bucket, + Key: t.context.collectionConfigKey(t.context.dataType), + Body: JSON.stringify(t.context.collectionConfig) + }).promise(); + + const collectionConfigStore = new CollectionConfigStore(t.context.bucket, t.context.stackName); + const fetchedCollectionConfig = await collectionConfigStore.get(t.context.dataType); + + t.deepEqual(fetchedCollectionConfig, t.context.collectionConfig); +}); + +test('get() does not hit S3 for a cached collection config', async (t) => { + await s3().putObject({ + Bucket: t.context.bucket, + Key: t.context.collectionConfigKey(t.context.dataType), + Body: JSON.stringify(t.context.collectionConfig) + }).promise(); + + const collectionConfigStore = new CollectionConfigStore(t.context.bucket, t.context.stackName); + + // Fetch the collection config once so it's in the cache + await collectionConfigStore.get(t.context.dataType); + + // Delete the S3 bucket so the config can't be fetched from S3 + await recursivelyDeleteS3Bucket(t.context.bucket); + + // This get() should use the cache + const fetchedCollectionConfig = await collectionConfigStore.get(t.context.dataType); + + t.deepEqual(fetchedCollectionConfig, t.context.collectionConfig); +}); + +test('get() throws an exception if the collection config could not be found', async (t) => { + const invalidDataType = randomString(); + const collectionConfigStore = new CollectionConfigStore(t.context.bucket, t.context.stackName); + + try { + await collectionConfigStore.get(invalidDataType); + t.fail('Expected an error to be thrown'); + } + catch (err) { + t.is(err.message, `A collection config for data type "${invalidDataType}" was not found.`); + } +}); + +test('get() throws an exception if the bucket does not exist', async (t) => { + const invalidBucket = randomString(); + const collectionConfigStore = new CollectionConfigStore(invalidBucket, t.context.stackName); + + try { + await collectionConfigStore.get(t.context.dataType); + t.fail('Expected an error to be thrown'); + } + catch (err) { + t.is(err.message, `Collection config bucket does not exist: ${invalidBucket}`); + } +}); + +test('put() stores a collection config to S3', async (t) => { + const collectionConfigStore = new CollectionConfigStore(t.context.bucket, t.context.stackName); + await collectionConfigStore.put(t.context.dataType, t.context.collectionConfig); + + const getObjectResponse = await s3().getObject({ + Bucket: t.context.bucket, + Key: t.context.collectionConfigKey(t.context.dataType) + }).promise(); + + const storedCollectionConfig = JSON.parse(getObjectResponse.Body.toString()); + t.deepEqual(storedCollectionConfig, t.context.collectionConfig); +}); + +test('put() updates the cache with the new collection config', async (t) => { + const collectionConfigStore = new CollectionConfigStore(t.context.bucket, t.context.stackName); + await collectionConfigStore.put(t.context.dataType, t.context.collectionConfig); + + // Delete the S3 bucket so the config can't be fetched from S3 + await recursivelyDeleteS3Bucket(t.context.bucket); + + // This get() should use the cache + const fetchedCollectionConfig = await collectionConfigStore.get(t.context.dataType); + + t.deepEqual(fetchedCollectionConfig, t.context.collectionConfig); +}); diff --git a/packages/ingest/granule.js b/packages/ingest/granule.js index 6587d1defbb..18a1b309e2e 100644 --- a/packages/ingest/granule.js +++ b/packages/ingest/granule.js @@ -9,10 +9,10 @@ const urljoin = require('url-join'); const cksum = require('cksum'); const checksum = require('checksum'); const errors = require('@cumulus/common/errors'); -const sftpMixin = require('./sftp'); -const ftpMixin = require('./ftp').ftpMixin; -const httpMixin = require('./http').httpMixin; -const s3Mixin = require('./s3').s3Mixin; +const { sftpMixin } = require('./sftp'); +const { ftpMixin } = require('./ftp'); +const { httpMixin } = require('./http'); +const { s3Mixin } = require('./s3'); const { baseProtocol } = require('./protocol'); class Discover { diff --git a/packages/ingest/parse-pdr.js b/packages/ingest/parse-pdr.js index 53b70951443..b617206ce1d 100644 --- a/packages/ingest/parse-pdr.js +++ b/packages/ingest/parse-pdr.js @@ -73,6 +73,15 @@ function parseSpec(pdrName, spec) { } module.exports.parseSpec = parseSpec; +/** + * Extract a granuleId from a filename + * + * @param {string} fileName - The filename to extract the granuleId from + * @param {RegExp|string} regex - A regular expression describing how to extract + * a granuleId from a filename + * @returns {string} the granuleId or the name of the file if no granuleId was + * found + */ function extractGranuleId(fileName, regex) { const test = new RegExp(regex); const match = fileName.match(test); @@ -83,17 +92,18 @@ function extractGranuleId(fileName, regex) { return fileName; } -module.exports.parsePdr = async function parsePdr(pdrFilePath, collection, pdrName) { - // then read the file and and pass it to parser +/** + * Load a PDR file from disk and parse the PVL document + * + * @param {string} pdrFilePath - the file to be loaded + * @returns {Object} a parsed PVL document + */ +async function loadPdrFile(pdrFilePath) { const pdrFile = await fs.readFile(pdrFilePath, 'utf8'); // Investigating CUMULUS-423 if (pdrFile.trim().length === 0) throw new Error(`PDR file had no contents: ${pdrFilePath}`); - const obj = { - granules: [] - }; - // because MODAPS PDRs do not follow the standard ODL spec // we have to make sure there are spaces before and after every // question mark @@ -110,61 +120,65 @@ module.exports.parsePdr = async function parsePdr(pdrFilePath, collection, pdrNa throw new PDRParsingError(e.message); } - // check if the PDR has groups - // if so, get the objects inside the first group - // TODO: handle cases where there are more than one group - const groups = parsed.groups(); - if (groups.length > 0) { - parsed = groups[0]; - } + return parsed; +} - // Get all the file groups - const fileGroups = parsed.objects('FILE_GROUP'); +// FIXME Figure out what this function does and document it +async function granuleFromFileGroup(fileGroup, pdrName, collectionConfigStore) { // eslint-disable-line require-jsdoc, max-len + if (!fileGroup.get('DATA_TYPE')) throw new PDRParsingError('DATA_TYPE is missing'); + const dataType = fileGroup.get('DATA_TYPE').value; - for (const group of fileGroups) { - // get all the file specs in each group - const specs = group.objects('FILE_SPEC'); - let dataType = group.get('DATA_TYPE'); + // get all the file specs in each group + const specs = fileGroup.objects('FILE_SPEC'); + if (specs.length === 0) throw new Error('No FILE_SPEC sections found.'); - if (!dataType) throw new PDRParsingError('DATA_TYPE is missing'); + const files = specs.map(parseSpec.bind(null, pdrName)); - dataType = dataType.value; + const collectionConfig = await collectionConfigStore.get(dataType); - // FIXME This is a very generic error - if (specs.length === 0) { - throw new Error(); - } + return { + dataType, + files, + granuleId: extractGranuleId(files[0].name, collectionConfig.granuleIdExtraction), + granuleSize: files.reduce((total, file) => total + file.fileSize, 0) + }; +} - const files = specs.map(parseSpec.bind(null, pdrName)); - const granuleId = extractGranuleId(files[0].name, collection.granuleIdExtraction); +/** + * Parse a PDR file + * + * @param {string} pdrFilePath - the file to be parsed + * @param {string} collectionConfigsUrl - the S3 location of the collection configs + * @param {string} pdrName - the name of the PDR + * @returns {Promise} an object representing the PDR + */ +exports.parsePdr = async function parsePdr(pdrFilePath, collectionConfigStore, pdrName) { + let pdrDocument = await loadPdrFile(pdrFilePath); - obj.granules.push({ - granuleId, - dataType, - granuleSize: files.reduce((total, file) => total + file.fileSize, 0), - files - }); - } + // check if the PDR has groups + // if so, get the objects inside the first group + // FIXME handle cases where there are more than one group + const groups = pdrDocument.groups(); + if (groups.length > 0) pdrDocument = groups[0]; // eslint-disable-line prefer-destructuring - // check file count - const fileCount = obj.granules.reduce((t, g) => t + g.files.length, 0); - const expectedFileCount = parsed.get('TOTAL_FILE_COUNT').value; - if (fileCount !== expectedFileCount) { - throw new PDRParsingError('FILE COUNT doesn\'t match expected file count'); - } + // Get all the file groups + const fileGroups = pdrDocument.objects('FILE_GROUP'); - obj.granulesCount = fileGroups.length; - obj.filesCount = fileCount; - obj.totalSize = obj.granules.reduce((t, g) => t + g.granuleSize, 0); + const promisedGranules = fileGroups.map((fileGroup) => + granuleFromFileGroup(fileGroup, pdrName, collectionConfigStore)); + const granules = await Promise.all(promisedGranules); - // Example object produced - // { - // "path": "/TEST_B/Cumulus/DATA/ID1612101200", - // "filename": "pg-PR1A0000-2016121001_000_002", - // "fileSize": 5975257, - // "checksumType": "CKSUM", - // "checksumValue": 299257224 - // } + // check file count + const filesCount = granules.reduce((total, granule) => total + granule.files.length, 0); + const expectedFileCount = pdrDocument.get('TOTAL_FILE_COUNT').value; + if (filesCount !== expectedFileCount) { + throw new PDRParsingError("FILE COUNT doesn't match expected file count"); + } - return obj; + return { + filesCount, + granules, + granulesCount: granules.length, + totalSize: granules.reduce((total, granule) => total + granule.granuleSize, 0) + }; }; diff --git a/packages/ingest/pdr.js b/packages/ingest/pdr.js index 28b5a9f9372..6ffecf307f2 100644 --- a/packages/ingest/pdr.js +++ b/packages/ingest/pdr.js @@ -2,15 +2,16 @@ const aws = require('@cumulus/common/aws'); const fs = require('fs-extra'); -const ftpMixin = require('./ftp').ftpMixin; +const { ftpMixin } = require('./ftp'); const get = require('lodash.get'); -const httpMixin = require('./http').httpMixin; +const { httpMixin } = require('./http'); const log = require('@cumulus/common/log'); -const parsePdr = require('./parse-pdr').parsePdr; +const { parsePdr } = require('./parse-pdr'); const path = require('path'); -const s3Mixin = require('./s3').s3Mixin; -const sftpMixin = require('./sftp'); +const { s3Mixin } = require('./s3'); +const { sftpMixin } = require('./sftp'); const { baseProtocol } = require('./protocol'); +const { CollectionConfigStore } = require('@cumulus/common'); /** * This is a base class for discovering PDRs @@ -26,7 +27,7 @@ class Discover { collection, provider, useList = false, - folder = 'pdrs', + folder = 'pdrs' ) { if (this.constructor === Discover) { throw new TypeError('Can not construct abstract class.'); @@ -49,7 +50,8 @@ class Discover { /** * discover PDRs from an endpoint - * @return {Promise} + * + * @returns {Promise} - resolves to new PDRs? * @public */ async discover() { @@ -104,10 +106,10 @@ class Parse { pdr, stack, bucket, - collection, provider, useList = false, - folder = 'pdrs') { + folder = 'pdrs' + ) { if (this.constructor === Parse) { throw new TypeError('Can not construct abstract class.'); } @@ -115,7 +117,6 @@ class Parse { this.pdr = pdr; this.stack = stack; this.bucket = bucket; - this.collection = collection; this.provider = provider; this.folder = folder; this.useList = useList; @@ -175,14 +176,13 @@ class Parse { /** * This method parses a PDR and returns all the granules in it * - * @param {string} pdrLocalPath PDR path on disk - * @return {Promise} + * @param {string} pdrLocalPath - PDR path on disk + * @returns {Promise} the parsed PDR * @public */ async parse(pdrLocalPath) { - // catching all parse errors here to mark the pdr as failed - // if any error occured - const parsed = await parsePdr(pdrLocalPath, this.collection, this.pdr.name); + const collectionConfigStore = new CollectionConfigStore(this.bucket, this.stack); + const parsed = await parsePdr(pdrLocalPath, collectionConfigStore, this.pdr.name); // each group represents a Granule record. // After adding all the files in the group to the Queue diff --git a/packages/ingest/sftp.js b/packages/ingest/sftp.js index 63e6f6229ad..cb5a657f633 100644 --- a/packages/ingest/sftp.js +++ b/packages/ingest/sftp.js @@ -1,14 +1,13 @@ 'use strict'; -const Client = require('ssh2').Client; -const join = require('path').join; -const log = require('@cumulus/common/log'); +const { Client } = require('ssh2'); +const { join } = require('path'); +const { log } = require('@cumulus/common'); const Crypto = require('./crypto').DefaultProvider; const recursion = require('./recursion'); - const { omit } = require('lodash'); -module.exports = (superclass) => class extends superclass { +module.exports.sftpMixin = (superclass) => class extends superclass { constructor(...args) { super(...args); diff --git a/packages/ingest/test/parse-pdr.js b/packages/ingest/test/parse-pdr.js new file mode 100644 index 00000000000..1863d29e7b8 --- /dev/null +++ b/packages/ingest/test/parse-pdr.js @@ -0,0 +1,124 @@ +'use strict'; + +const test = require('ava'); +const path = require('path'); +const { + findTestDataDirectory, + randomString +} = require('@cumulus/common/test-utils'); +const { CollectionConfigStore } = require('@cumulus/common'); +const { parsePdr } = require('../parse-pdr'); +const { + aws: { + recursivelyDeleteS3Bucket, + s3 + } +} = require('@cumulus/common'); + +test.beforeEach(async (t) => { + t.context.internalBucket = `internal-bucket-${randomString().slice(0, 6)}`; + t.context.stackName = `stack-${randomString().slice(0, 6)}`; + t.context.collectionConfigStore = + new CollectionConfigStore(t.context.internalBucket, t.context.stackName); + + await s3().createBucket({ Bucket: t.context.internalBucket }).promise(); +}); + +test.afterEach(async (t) => { + await Promise.all([ + recursivelyDeleteS3Bucket(t.context.internalBucket) + ]); +}); + +test('parse-pdr properly parses a simple PDR file', async (t) => { + const testDataDirectory = await findTestDataDirectory(); + const pdrFilename = path.join(testDataDirectory, 'pdrs', 'MOD09GQ.PDR'); + + const pdrName = `${randomString()}.PDR`; + + const collectionConfig = { granuleIdExtraction: '^(.*)\.hdf' }; + await t.context.collectionConfigStore.put('MOD09GQ', collectionConfig); + + const result = await parsePdr(pdrFilename, t.context.collectionConfigStore, pdrName); + + t.is(result.filesCount, 2); + t.is(result.granulesCount, 1); + t.is(result.granules.length, 1); + t.is(result.totalSize, 17909733); + + const granule = result.granules[0]; + t.is(granule.dataType, 'MOD09GQ'); + + const hdfFile = result.granules[0].files.find((file) => file.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf'); // eslint-disable-line max-len + t.truthy(hdfFile); + t.is(hdfFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(hdfFile.fileSize, 17865615); + t.is(hdfFile.checksumType, 'CKSUM'); + t.is(hdfFile.checksumValue, 4208254019); + + const metFile = result.granules[0].files.find((file) => file.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf.met'); // eslint-disable-line max-len + t.truthy(metFile); + t.is(metFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(metFile.fileSize, 44118); +}); + +test('parse-pdr properly parses PDR with granules of different data-types', async (t) => { + const testDataDirectory = await findTestDataDirectory(); + const pdrFilename = path.join(testDataDirectory, 'pdrs', 'multi-data-type.PDR'); + + const pdrName = `${randomString()}.PDR`; + + const mod09CollectionConfig = { + granuleIdExtraction: '^(.*)\.hdf' + }; + + const mod87CollectionConfig = { + granuleIdExtraction: '^PENS-(.*)\.hdf' + }; + + await Promise.all([ + t.context.collectionConfigStore.put('MOD09GQ', mod09CollectionConfig), + t.context.collectionConfigStore.put('MOD87GQ', mod87CollectionConfig) + ]); + + const result = await parsePdr(pdrFilename, t.context.collectionConfigStore, pdrName); + + t.is(result.filesCount, 4); + t.is(result.granulesCount, 2); + t.is(result.granules.length, 2); + t.is(result.totalSize, 35819466); + + const mod09Granule = result.granules.find((granule) => granule.dataType === 'MOD09GQ'); + t.truthy(mod09Granule); + t.is(mod09Granule.granuleId, 'MOD09GQ.A2017224.h09v02.006.2017227165020'); + t.is(mod09Granule.granuleSize, 17909733); + + const mod09HdfFile = mod09Granule.files.find((file) => file.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf'); // eslint-disable-line max-len + t.truthy(mod09HdfFile); + t.is(mod09HdfFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(mod09HdfFile.fileSize, 17865615); + t.is(mod09HdfFile.checksumType, 'CKSUM'); + t.is(mod09HdfFile.checksumValue, 4208254019); + + const mod09MetFile = mod09Granule.files.find((file) => file.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf.met'); // eslint-disable-line max-len + t.truthy(mod09MetFile); + t.is(mod09MetFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(mod09MetFile.fileSize, 44118); + + const mod87Granule = result.granules.find((granule) => granule.dataType === 'MOD87GQ'); + t.truthy(mod87Granule); + t.is(mod87Granule.granuleId, 'MOD87GQ.A2017224.h09v02.006.2017227165020'); + t.is(mod87Granule.granuleSize, 17909733); + + const mod87HdfFile = mod87Granule.files.find((file) => file.name === 'PENS-MOD87GQ.A2017224.h09v02.006.2017227165020.hdf'); // eslint-disable-line max-len + t.truthy(mod87HdfFile); + t.is(mod87HdfFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(mod87HdfFile.fileSize, 17865615); + t.is(mod87HdfFile.checksumType, 'CKSUM'); + t.is(mod87HdfFile.checksumValue, 4208254019); + + const mod87MetFile = mod87Granule.files.find((file) => file.name === 'PENS-MOD87GQ.A2017224.h09v02.006.2017227165020.hdf.met'); // eslint-disable-line max-len + t.truthy(mod87MetFile); + t.is(mod87MetFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(mod87MetFile.fileSize, 44118); +}); diff --git a/packages/test-data/pdrs/multi-data-type.PDR b/packages/test-data/pdrs/multi-data-type.PDR new file mode 100644 index 00000000000..c918bf4a042 --- /dev/null +++ b/packages/test-data/pdrs/multi-data-type.PDR @@ -0,0 +1,41 @@ +ORIGINATING_SYSTEM = MODAPS_CUMULUS_FPROC; +TOTAL_FILE_COUNT = 4; +EXPIRATION_TIME = 2017-08-22T20:07:21; +OBJECT=FILE_GROUP; + DATA_TYPE = MOD09GQ; + DATA_VERSION = 6; + NODE_NAME = modpdr01; + OBJECT = FILE_SPEC; + DIRECTORY_ID = /MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA; + FILE_ID = MOD09GQ.A2017224.h09v02.006.2017227165020.hdf; + FILE_TYPE = HDF; + FILE_SIZE = 17865615; + FILE_CKSUM_TYPE = CKSUM; + FILE_CKSUM_VALUE = 4208254019; + END_OBJECT = FILE_SPEC; + OBJECT = FILE_SPEC; + DIRECTORY_ID = /MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA; + FILE_ID = MOD09GQ.A2017224.h09v02.006.2017227165020.hdf.met; + FILE_TYPE = METADATA; + FILE_SIZE = 44118; + END_OBJECT = FILE_SPEC; +END_OBJECT = FILE_GROUP; +OBJECT=FILE_GROUP; + DATA_TYPE = MOD87GQ; + DATA_VERSION = 6; + NODE_NAME = modpdr01; + OBJECT = FILE_SPEC; + DIRECTORY_ID = /MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA; + FILE_ID = PENS-MOD87GQ.A2017224.h09v02.006.2017227165020.hdf; + FILE_TYPE = HDF; + FILE_SIZE = 17865615; + FILE_CKSUM_TYPE = CKSUM; + FILE_CKSUM_VALUE = 4208254019; + END_OBJECT = FILE_SPEC; + OBJECT = FILE_SPEC; + DIRECTORY_ID = /MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA; + FILE_ID = PENS-MOD87GQ.A2017224.h09v02.006.2017227165020.hdf.met; + FILE_TYPE = METADATA; + FILE_SIZE = 44118; + END_OBJECT = FILE_SPEC; +END_OBJECT = FILE_GROUP; diff --git a/tasks/discover-pdrs/tests/index.js b/tasks/discover-pdrs/tests/index.js index 26c44669ade..30a732e920c 100644 --- a/tasks/discover-pdrs/tests/index.js +++ b/tasks/discover-pdrs/tests/index.js @@ -36,8 +36,9 @@ test('test pdr discovery with FTP assuming all PDRs are new', async (t) => { try { const output = await discoverPdrs(event); + await validateOutput(t, output); - t.is(output.pdrs.length, 4); + t.is(output.pdrs.length, 5); } catch (err) { if (err instanceof RemoteResourceError) { @@ -137,7 +138,7 @@ test('test pdr discovery with FTP assuming some PDRs are new', async (t) => { }) .then(() => discoverPdrs(newPayload, {})) .then((output) => { - t.is(output.pdrs.length, 3); + t.is(output.pdrs.length, 4); return validateOutput(t, output); }) .then(() => recursivelyDeleteS3Bucket(internalBucketName)) @@ -186,7 +187,7 @@ test('test pdr discovery with HTTP assuming some PDRs are new', async (t) => { await validateOutput(t, output); - t.is(output.pdrs.length, 3); + t.is(output.pdrs.length, 4); const names = output.pdrs.map((p) => p.name); newPdrs.forEach((pdr) => t.true(names.includes(pdr))); } @@ -248,7 +249,7 @@ test('test pdr discovery with SFTP assuming some PDRs are new', async (t) => { await validateOutput(t, output); - t.is(output.pdrs.length, 3); + t.is(output.pdrs.length, 4); const names = output.pdrs.map((p) => p.name); newPdrs.forEach((pdr) => t.true(names.includes(pdr))); } diff --git a/tasks/parse-pdr/index.js b/tasks/parse-pdr/index.js index b4987abb48a..bc2e8cf3088 100644 --- a/tasks/parse-pdr/index.js +++ b/tasks/parse-pdr/index.js @@ -5,7 +5,6 @@ const get = require('lodash.get'); const errors = require('@cumulus/common/errors'); const pdr = require('@cumulus/ingest/pdr'); const log = require('@cumulus/common/log'); -const { justLocalRun } = require('@cumulus/common/local-helpers'); /** * Parse a PDR @@ -17,7 +16,6 @@ const { justLocalRun } = require('@cumulus/common/local-helpers'); * @param {string} event.config.pdrFolder - folder for the PDRs * @param {Object} event.config.provider - provider information * @param {Object} event.config.buckets - S3 buckets -* @param {Object} event.config.collection - information about data collection related to task * @returns {Promise.} - see schemas/output.json for detailed output schema * that is passed to the next task in the workflow **/ @@ -30,8 +28,7 @@ function parsePdr(event) { const parse = new Parse( input.pdr, config.stack, - config.bucket, - config.collection, + config.internalBucket, provider, config.useList ); @@ -71,9 +68,3 @@ function handler(event, context, callback) { cumulusMessageAdapter.runCumulusTask(parsePdr, event, context, callback); } exports.handler = handler; - -// use node index.js local to invoke this -justLocalRun(() => { - const payload = require('@cumulus/test-data/cumulus_messages/parse-pdr.json'); // eslint-disable-line global-require, max-len - handler(payload, {}, (e, r) => console.log(e, r)); -}); diff --git a/tasks/parse-pdr/schemas/config.json b/tasks/parse-pdr/schemas/config.json index bc88e837665..b38a46e391a 100644 --- a/tasks/parse-pdr/schemas/config.json +++ b/tasks/parse-pdr/schemas/config.json @@ -2,6 +2,7 @@ "title": "ParsePdrConfig", "description": "Describes the config used by the parse-pdr task", "type": "object", + "required": [ "stack", "provider", "internalBucket"], "properties": { "pdrFolder": { "type": "string" @@ -21,24 +22,7 @@ } } }, - "buckets": { - "type": "object", - "description": "aws s3 buckets used by this task", - "properties": { - "internal": { "type": "string" }, - "private": { "type": "string" }, - "protected": { "type": "string" }, - "public": { "type": "string" } - } - }, - "collection": { - "type": "object", - "required": ["name", "granuleIdExtraction"], - "properties": { - "name": { "type": "string" }, - "granuleIdExtraction": { "type": "string" } - } - }, + "internalBucket": { "type": "string" }, "useList": { "description": "flag to tell ftp server to use 'LIST' instead of 'STAT'", "default": false, diff --git a/tasks/parse-pdr/tests/.eslintrc.json b/tasks/parse-pdr/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/tasks/parse-pdr/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/tasks/parse-pdr/tests/parse_pdrs_test.js b/tasks/parse-pdr/tests/parse_pdrs_test.js index 900627ffc6a..cb4e93d3499 100644 --- a/tasks/parse-pdr/tests/parse_pdrs_test.js +++ b/tasks/parse-pdr/tests/parse_pdrs_test.js @@ -2,14 +2,12 @@ const errors = require('@cumulus/common/errors'); const fs = require('fs-extra'); -const modis = require('@cumulus/test-data/payloads/new-message-schema/parse.json'); -const path = require('path'); const test = require('ava'); const { recursivelyDeleteS3Bucket, s3 } = require('@cumulus/common/aws'); -const { cloneDeep } = require('lodash'); + +const { CollectionConfigStore } = require('@cumulus/common'); const { - findTestDataDirectory, randomString, validateConfig, validateInput, @@ -18,76 +16,131 @@ const { const { parsePdr } = require('../index'); -test('parse PDR from FTP endpoint', async (t) => { - const internalBucketName = randomString(); +test.beforeEach(async (t) => { + t.context.payload = { + config: { + stack: randomString(), + internalBucket: randomString(), + provider: {} + }, + input: { + pdr: { + name: 'MOD09GQ.PDR', + path: '/pdrs' + } + } + }; + + await s3().createBucket({ Bucket: t.context.payload.config.internalBucket }).promise(); + + const collectionConfig = { + name: 'MOD09GQ', + granuleIdExtraction: '^(.*)\.hdf' + }; + + const collectionConfigStore = new CollectionConfigStore( + t.context.payload.config.internalBucket, + t.context.payload.config.stack + ); + await collectionConfigStore.put('MOD09GQ', collectionConfig); +}); - const newPayload = cloneDeep(modis); +test.afterEach(async (t) => { + await recursivelyDeleteS3Bucket(t.context.payload.config.internalBucket); +}); - newPayload.config.bucket = internalBucketName; - newPayload.config.provider = { +test('parse PDR from FTP endpoint', async (t) => { + t.context.payload.config.provider = { id: 'MODAPS', protocol: 'ftp', host: 'localhost', username: 'testuser', password: 'testpass' }; - newPayload.config.useList = true; - - await validateConfig(t, newPayload.config); - - return s3().createBucket({ Bucket: internalBucketName }).promise() - .then(() => parsePdr(newPayload)) - .then((output) => { - t.is(output.granules.length, output.granulesCount); - t.is(output.pdr.name, newPayload.input.pdr.name); - t.is(output.filesCount, 2); - return output; - }) - .then((output) => validateOutput(t, output)) - .then(() => recursivelyDeleteS3Bucket(internalBucketName)) - .catch((err) => { - if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { - t.pass('ignoring this test. Test server seems to be down'); - } - else t.fail(err); - return recursivelyDeleteS3Bucket(internalBucketName); - }); -}); + t.context.payload.config.useList = true; -test('parse PDR from HTTP endpoint', async (t) => { - const internalBucketName = randomString(); + await validateInput(t, t.context.payload.input); + await validateConfig(t, t.context.payload.config); - // Figure out the directory paths that we're working with - const testDataDirectory = path.join(await findTestDataDirectory(), 'pdrs'); + let output; + try { + output = await parsePdr(t.context.payload); - // Create providerPathDirectory and internal bucket - await s3().createBucket({ Bucket: internalBucketName }).promise(); + await validateOutput(t, output); - const pdrName = 'MOD09GQ.PDR'; + t.deepEqual(output.pdr, t.context.payload.input.pdr); + t.is(output.granules.length, 1); + t.is(output.granulesCount, 1); + t.is(output.filesCount, 2); + t.is(output.totalSize, 17909733); + + const granule = output.granules[0]; + t.is(granule.granuleId, 'MOD09GQ.A2017224.h09v02.006.2017227165020'); + t.is(granule.dataType, 'MOD09GQ'); + t.is(granule.granuleSize, 17909733); + + const hdfFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf'); + t.truthy(hdfFile); + t.is(hdfFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(hdfFile.fileSize, 17865615); + t.is(hdfFile.checksumType, 'CKSUM'); + t.is(hdfFile.checksumValue, 4208254019); - const newPayload = cloneDeep(modis); - newPayload.config.bucket = internalBucketName; - newPayload.config.provider = { + const metFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf.met'); + t.truthy(metFile); + t.is(metFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(metFile.fileSize, 44118); + } + catch (err) { + if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { + t.pass('ignoring this test. Test server seems to be down'); + } + else t.fail(err); + } +}); + +test('parse PDR from HTTP endpoint', async (t) => { + t.context.payload.config.provider = { id: 'MODAPS', protocol: 'http', host: 'http://localhost:3030' }; - newPayload.input = { - pdr: { - name: pdrName, - path: `/pdrs` - } - }; - await validateInput(t, newPayload.input); - await validateConfig(t, newPayload.config); + await validateInput(t, t.context.payload.input); + await validateConfig(t, t.context.payload.config); + let output; try { - const output = await parsePdr(newPayload); + output = await parsePdr(t.context.payload); + await validateOutput(t, output); - t.is(output.granules.length, output.granulesCount); - t.is(output.pdr.name, pdrName); + + t.deepEqual(output.pdr, t.context.payload.input.pdr); + t.is(output.granules.length, 1); + t.is(output.granulesCount, 1); t.is(output.filesCount, 2); + t.is(output.totalSize, 17909733); + + const granule = output.granules[0]; + t.is(granule.granuleId, 'MOD09GQ.A2017224.h09v02.006.2017227165020'); + t.is(granule.dataType, 'MOD09GQ'); + t.is(granule.granuleSize, 17909733); + + const hdfFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf'); + t.truthy(hdfFile); + t.is(hdfFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(hdfFile.fileSize, 17865615); + t.is(hdfFile.checksumType, 'CKSUM'); + t.is(hdfFile.checksumValue, 4208254019); + + const metFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf.met'); + t.truthy(metFile); + t.is(metFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(metFile.fileSize, 44118); } catch (err) { if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { @@ -95,23 +148,10 @@ test('parse PDR from HTTP endpoint', async (t) => { } else t.fail(err); } - finally { - // Clean up - await recursivelyDeleteS3Bucket(internalBucketName); - } }); test('parse PDR from SFTP endpoint', async (t) => { - const internalBucketName = randomString(); - - // Create providerPathDirectory and internal bucket - await s3().createBucket({ Bucket: internalBucketName }).promise(); - - const pdrName = 'MOD09GQ.PDR'; - - const newPayload = cloneDeep(modis); - newPayload.config.bucket = internalBucketName; - newPayload.config.provider = { + t.context.payload.config.provider = { id: 'MODAPS', protocol: 'sftp', host: 'localhost', @@ -119,23 +159,40 @@ test('parse PDR from SFTP endpoint', async (t) => { username: 'user', password: 'password' }; - newPayload.input = { - pdr: { - name: pdrName, - path: 'pdrs' - } - }; - await validateInput(t, newPayload.input); - await validateConfig(t, newPayload.config); + await validateInput(t, t.context.payload.input); + await validateConfig(t, t.context.payload.config); + let output; try { - const output = await parsePdr(newPayload); + output = await parsePdr(t.context.payload); await validateOutput(t, output); - t.is(output.granules.length, output.granulesCount); - t.is(output.pdr.name, pdrName); + + t.deepEqual(output.pdr, t.context.payload.input.pdr); + t.is(output.granules.length, 1); + t.is(output.granulesCount, 1); t.is(output.filesCount, 2); + t.is(output.totalSize, 17909733); + + const granule = output.granules[0]; + t.is(granule.granuleId, 'MOD09GQ.A2017224.h09v02.006.2017227165020'); + t.is(granule.dataType, 'MOD09GQ'); + t.is(granule.granuleSize, 17909733); + + const hdfFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf'); + t.truthy(hdfFile); + t.is(hdfFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(hdfFile.fileSize, 17865615); + t.is(hdfFile.checksumType, 'CKSUM'); + t.is(hdfFile.checksumValue, 4208254019); + + const metFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf.met'); + t.truthy(metFile); + t.is(metFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(metFile.fileSize, 44118); } catch (err) { if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { @@ -143,54 +200,64 @@ test('parse PDR from SFTP endpoint', async (t) => { } else t.fail(err); } - finally { - // Clean up - await recursivelyDeleteS3Bucket(internalBucketName); - } }); test('Parse a PDR from an S3 provider', async (t) => { - const internalBucket = randomString(); - const bucket = randomString(); - const pdrName = 'MOD09GQ.PDR'; - - await Promise.all([ - s3().createBucket({ Bucket: bucket }).promise(), - s3().createBucket({ Bucket: internalBucket }).promise() - ]); - - await s3().putObject({ - Bucket: bucket, - Key: pdrName, - Body: fs.createReadStream('../../packages/test-data/pdrs/MOD09GQ.PDR') - }).promise(); - - const event = cloneDeep(modis); - event.config.bucket = internalBucket; - event.config.provider = { + t.context.payload.config.provider = { id: 'MODAPS', protocol: 's3', - host: bucket + host: randomString() }; + t.context.payload.input.pdr.path = '/pdrs'; - event.input.pdr.path = ''; + await validateInput(t, t.context.payload.input); + await validateConfig(t, t.context.payload.config); - await validateConfig(t, event.config); - await validateInput(t, event.input); + await s3().createBucket({ Bucket: t.context.payload.config.provider.host }).promise(); - let output; try { - output = await parsePdr(event); + await s3().putObject({ + Bucket: t.context.payload.config.provider.host, + Key: `${t.context.payload.input.pdr.path}/${t.context.payload.input.pdr.name}`, + Body: fs.createReadStream('../../packages/test-data/pdrs/MOD09GQ.PDR') + }).promise(); + + const output = await parsePdr(t.context.payload); + + await validateOutput(t, output); + + t.deepEqual(output.pdr, t.context.payload.input.pdr); + t.is(output.granules.length, 1); + t.is(output.granulesCount, 1); + t.is(output.filesCount, 2); + t.is(output.totalSize, 17909733); + + const granule = output.granules[0]; + t.is(granule.granuleId, 'MOD09GQ.A2017224.h09v02.006.2017227165020'); + t.is(granule.dataType, 'MOD09GQ'); + t.is(granule.granuleSize, 17909733); + + const hdfFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf'); + t.truthy(hdfFile); + t.is(hdfFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(hdfFile.fileSize, 17865615); + t.is(hdfFile.checksumType, 'CKSUM'); + t.is(hdfFile.checksumValue, 4208254019); + + const metFile = granule.files.find((f) => + f.name === 'MOD09GQ.A2017224.h09v02.006.2017227165020.hdf.met'); + t.truthy(metFile); + t.is(metFile.path, '/MODOPS/MODAPS/EDC/CUMULUS/FPROC/DATA'); + t.is(metFile.fileSize, 44118); + } + catch (err) { + if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { + t.pass('ignoring this test. Test server seems to be down'); + } + else t.fail(err); } finally { - await Promise.all([ - recursivelyDeleteS3Bucket(bucket), - recursivelyDeleteS3Bucket(internalBucket) - ]); + await recursivelyDeleteS3Bucket(t.context.payload.config.provider.host); } - - await validateOutput(t, output); - t.is(output.granules.length, output.granulesCount); - t.is(output.pdr.name, pdrName); - t.is(output.filesCount, 2); }); diff --git a/tasks/queue-granules/index.js b/tasks/queue-granules/index.js index 2189c21b93e..7e291955554 100644 --- a/tasks/queue-granules/index.js +++ b/tasks/queue-granules/index.js @@ -2,6 +2,7 @@ const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); const { enqueueGranuleIngestMessage } = require('@cumulus/ingest/queue'); +const { CollectionConfigStore } = require('@cumulus/common'); /** * See schemas/input.json and schemas/config.json for detailed event description @@ -13,14 +14,22 @@ const { enqueueGranuleIngestMessage } = require('@cumulus/ingest/queue'); async function queueGranules(event) { const granules = event.input.granules || []; - const executionArns = await Promise.all(granules.map((granule) => enqueueGranuleIngestMessage( - granule, - event.config.queueUrl, - event.config.granuleIngestMessageTemplateUri, - event.config.provider, - event.config.collection, - event.input.pdr - ))); + const collectionConfigStore = + new CollectionConfigStore(event.config.internalBucket, event.config.stackName); + + const executionArns = await Promise.all( // eslint-disable-line function-paren-newline + granules.map(async (granule) => { + const collectionConfig = await collectionConfigStore.get(granule.dataType); + + return enqueueGranuleIngestMessage( + granule, + event.config.queueUrl, + event.config.granuleIngestMessageTemplateUri, + event.config.provider, + collectionConfig, + event.input.pdr + ); + })); const result = { running: executionArns }; if (event.input.pdr) result.pdr = event.input.pdr; diff --git a/tasks/queue-granules/schemas/config.json b/tasks/queue-granules/schemas/config.json index adff13c2691..df62b41bc50 100644 --- a/tasks/queue-granules/schemas/config.json +++ b/tasks/queue-granules/schemas/config.json @@ -3,16 +3,18 @@ "description": "Describes the config used by the queue-granules task", "type": "object", "required": [ - "collection", "provider", "queueUrl", - "granuleIngestMessageTemplateUri" + "granuleIngestMessageTemplateUri", + "stackName", + "internalBucket" ], "additionalProperties": false, "properties": { - "collection": { "type": "object" }, + "granuleIngestMessageTemplateUri": { "type": "string" }, + "internalBucket": { "type": "string" }, "provider": { "type": "object" }, "queueUrl": { "type": "string" }, - "granuleIngestMessageTemplateUri": { "type": "string" } + "stackName": { "type": "string" } } } diff --git a/tasks/queue-granules/tests/index.js b/tasks/queue-granules/tests/index.js index 29c9cda11d6..567d00ab692 100644 --- a/tasks/queue-granules/tests/index.js +++ b/tasks/queue-granules/tests/index.js @@ -3,7 +3,10 @@ const test = require('ava'); const { - createQueue, s3, sqs, recursivelyDeleteS3Bucket + createQueue, + s3, + sqs, + recursivelyDeleteS3Bucket } = require('@cumulus/common/aws'); const { randomString, @@ -11,14 +14,22 @@ const { validateInput, validateOutput } = require('@cumulus/common/test-utils'); +const { CollectionConfigStore } = require('@cumulus/common'); const { queueGranules } = require('../index'); test.beforeEach(async (t) => { + t.context.internalBucket = `internal-bucket-${randomString().slice(0, 6)}`; + t.context.stackName = `stack-${randomString().slice(0, 6)}`; t.context.stateMachineArn = randomString(); - t.context.templateBucket = randomString(); - await s3().createBucket({ Bucket: t.context.templateBucket }).promise(); + t.context.collectionConfigStore = + new CollectionConfigStore(t.context.internalBucket, t.context.stackName); + + await Promise.all([ + s3().createBucket({ Bucket: t.context.internalBucket }).promise(), + s3().createBucket({ Bucket: t.context.templateBucket }).promise() + ]); t.context.messageTemplate = { cumulus_meta: { @@ -35,7 +46,8 @@ test.beforeEach(async (t) => { t.context.event = { config: { - collection: { name: 'collection-name' }, + internalBucket: t.context.internalBucket, + stackName: t.context.stackName, provider: { name: 'provider-name' }, queueUrl: await createQueue(), granuleIngestMessageTemplateUri: `s3://${t.context.templateBucket}/${messageTemplateKey}` @@ -48,16 +60,21 @@ test.beforeEach(async (t) => { test.afterEach(async (t) => { await Promise.all([ + recursivelyDeleteS3Bucket(t.context.internalBucket), recursivelyDeleteS3Bucket(t.context.templateBucket), sqs().deleteQueue({ QueueUrl: t.context.event.config.queueUrl }).promise() ]); }); test('The correct output is returned when granules are queued without a PDR', async (t) => { + const dataType = `data-type-${randomString().slice(0, 6)}`; + const collectionConfig = { foo: 'bar' }; + await t.context.collectionConfigStore.put(dataType, collectionConfig); + const { event } = t.context; event.input.granules = [ - { granuleId: randomString(), files: [] }, - { granuleId: randomString(), files: [] } + { dataType, granuleId: randomString(), files: [] }, + { dataType, granuleId: randomString(), files: [] } ]; await validateConfig(t, event.config); @@ -71,10 +88,14 @@ test('The correct output is returned when granules are queued without a PDR', as }); test('The correct output is returned when granules are queued with a PDR', async (t) => { + const dataType = `data-type-${randomString().slice(0, 6)}`; + const collectionConfig = { foo: 'bar' }; + await t.context.collectionConfigStore.put(dataType, collectionConfig); + const { event } = t.context; event.input.granules = [ - { granuleId: randomString(), files: [] }, - { granuleId: randomString(), files: [] } + { dataType, granuleId: randomString(), files: [] }, + { dataType, granuleId: randomString(), files: [] } ]; event.input.pdr = { name: randomString(), path: randomString() }; @@ -89,6 +110,10 @@ test('The correct output is returned when granules are queued with a PDR', async }); test('The correct output is returned when no granules are queued', async (t) => { + const dataType = `data-type-${randomString().slice(0, 6)}`; + const collectionConfig = { foo: 'bar' }; + await t.context.collectionConfigStore.put(dataType, collectionConfig); + const { event } = t.context; event.input.granules = []; @@ -102,10 +127,14 @@ test('The correct output is returned when no granules are queued', async (t) => }); test('Granules are added to the queue', async (t) => { + const dataType = `data-type-${randomString().slice(0, 6)}`; + const collectionConfig = { foo: 'bar' }; + await t.context.collectionConfigStore.put(dataType, collectionConfig); + const { event } = t.context; event.input.granules = [ - { granuleId: randomString(), files: [] }, - { granuleId: randomString(), files: [] } + { dataType, granuleId: randomString(), files: [] }, + { dataType, granuleId: randomString(), files: [] } ]; await validateConfig(t, event.config); @@ -127,22 +156,28 @@ test('Granules are added to the queue', async (t) => { }); test('The correct message is enqueued without a PDR', async (t) => { - const fileNameA = randomString(); - const granuleIdA = randomString(); - const fileNameB = randomString(); - const granuleIdB = randomString(); + const event = t.context.event; - const { event } = t.context; - event.input.granules = [ - { - granuleId: granuleIdA, - files: [{ name: fileNameA }] - }, - { - granuleId: granuleIdB, - files: [{ name: fileNameB }] - } - ]; + const granule1 = { + dataType: `data-type-${randomString().slice(0, 6)}`, + granuleId: `granule-${randomString().slice(0, 6)}`, + files: [{ name: `file-${randomString().slice(0, 6)}` }] + }; + const collectionConfig1 = { name: `collection-config-${randomString().slice(0, 6)}` }; + + const granule2 = { + dataType: `data-type-${randomString().slice(0, 6)}`, + granuleId: `granule-${randomString().slice(0, 6)}`, + files: [{ name: `file-${randomString().slice(0, 6)}` }] + }; + const collectionConfig2 = { name: `collection-config-${randomString().slice(0, 6)}` }; + + event.input.granules = [granule1, granule2]; + + await Promise.all([ + t.context.collectionConfigStore.put(granule1.dataType, collectionConfig1), + t.context.collectionConfigStore.put(granule2.dataType, collectionConfig2) + ]); await validateConfig(t, event.config); await validateInput(t, event.input); @@ -151,42 +186,6 @@ test('The correct message is enqueued without a PDR', async (t) => { await validateOutput(t, output); - const expectedMessages = {}; - expectedMessages[granuleIdA] = { - cumulus_meta: { - state_machine: t.context.stateMachineArn - }, - meta: { - collection: { name: 'collection-name' }, - provider: { name: 'provider-name' } - }, - payload: { - granules: [ - { - granuleId: granuleIdA, - files: [{ name: fileNameA }] - } - ] - } - }; - expectedMessages[granuleIdB] = { - cumulus_meta: { - state_machine: t.context.stateMachineArn - }, - meta: { - collection: { name: 'collection-name' }, - provider: { name: 'provider-name' } - }, - payload: { - granules: [ - { - granuleId: granuleIdB, - files: [{ name: fileNameB }] - } - ] - } - }; - // Get messages from the queue const receiveMessageResponse = await sqs().receiveMessage({ QueueUrl: t.context.event.config.queueUrl, @@ -195,34 +194,90 @@ test('The correct message is enqueued without a PDR', async (t) => { }).promise(); const messages = receiveMessageResponse.Messages.map((message) => JSON.parse(message.Body)); - const receivedGranuleIds = messages.map((message) => message.payload.granules[0].granuleId); - t.true(receivedGranuleIds.includes(granuleIdA)); - t.true(receivedGranuleIds.includes(granuleIdB)); - t.is(messages.length, 2); - messages.forEach((message) => { - const { granuleId } = message.payload.granules[0]; - t.truthy(message.cumulus_meta.execution_name); - expectedMessages[granuleId].cumulus_meta.execution_name = message.cumulus_meta.execution_name; - t.deepEqual(message, expectedMessages[granuleId]); - }); -}); -test('The correct message is enqueued with a PDR', async (t) => { - const fileName = randomString(); - const granuleId = randomString(); - const pdrName = randomString(); - const pdrPath = randomString(); + const message1 = messages.find((message) => + message.payload.granules[0].granuleId === granule1.granuleId); - const { event } = t.context; - event.input.granules = [ + t.truthy(message1); + t.deepEqual( + message1, { - granuleId, - files: [{ name: fileName }] + cumulus_meta: { + // The execution name is randomly generated, so we don't care what the value is here + execution_name: message1.cumulus_meta.execution_name, + state_machine: t.context.stateMachineArn + }, + meta: { + collection: collectionConfig1, + provider: { name: 'provider-name' } + }, + payload: { + granules: [ + { + granuleId: granule1.granuleId, + files: granule1.files + } + ] + } } - ]; + ); + + const message2 = messages.find((message) => + message.payload.granules[0].granuleId === granule2.granuleId); + t.truthy(message2); + t.deepEqual( + message2, + { + cumulus_meta: { + // The execution name is randomly generated, so we don't care what the value is here + execution_name: message2.cumulus_meta.execution_name, + state_machine: t.context.stateMachineArn + }, + meta: { + collection: collectionConfig2, + provider: { name: 'provider-name' } + }, + payload: { + granules: [ + { + granuleId: granule2.granuleId, + files: granule2.files + } + ] + } + } + ); +}); + +test('The correct message is enqueued with a PDR', async (t) => { + const event = t.context.event; + + const pdrName = `pdr-name-${randomString()}`; + const pdrPath = `pdr-path-${randomString()}`; event.input.pdr = { name: pdrName, path: pdrPath }; + const granule1 = { + dataType: `data-type-${randomString().slice(0, 6)}`, + granuleId: `granule-${randomString().slice(0, 6)}`, + files: [{ name: `file-${randomString().slice(0, 6)}` }] + }; + const collectionConfig1 = { name: `collection-config-${randomString().slice(0, 6)}` }; + + const granule2 = { + dataType: `data-type-${randomString().slice(0, 6)}`, + granuleId: `granule-${randomString().slice(0, 6)}`, + files: [{ name: `file-${randomString().slice(0, 6)}` }] + }; + const collectionConfig2 = { name: `collection-config-${randomString().slice(0, 6)}` }; + + event.input.granules = [granule1, granule2]; + + await Promise.all([ + t.context.collectionConfigStore.put(granule1.dataType, collectionConfig1), + t.context.collectionConfigStore.put(granule2.dataType, collectionConfig2) + ]); + await validateConfig(t, event.config); await validateInput(t, event.input); @@ -236,33 +291,64 @@ test('The correct message is enqueued with a PDR', async (t) => { MaxNumberOfMessages: 10, WaitTimeSeconds: 1 }).promise(); - const messages = receiveMessageResponse.Messages; + const messages = receiveMessageResponse.Messages.map((message) => JSON.parse(message.Body)); - const expectedMessage = { - cumulus_meta: { - state_machine: t.context.stateMachineArn - }, - meta: { - collection: { name: 'collection-name' }, - provider: { name: 'provider-name' }, - pdr: { - name: pdrName, - path: pdrPath + t.is(messages.length, 2); + + const message1 = messages.find((message) => + message.payload.granules[0].granuleId === granule1.granuleId); + + t.truthy(message1); + t.deepEqual( + message1, + { + cumulus_meta: { + // The execution name is randomly generated, so we don't care what the value is here + execution_name: message1.cumulus_meta.execution_name, + state_machine: t.context.stateMachineArn + }, + meta: { + pdr: event.input.pdr, + collection: collectionConfig1, + provider: { name: 'provider-name' } + }, + payload: { + granules: [ + { + granuleId: granule1.granuleId, + files: granule1.files + } + ] } - }, - payload: { - granules: [ - { - granuleId, - files: [{ name: fileName }] - } - ] } - }; - const message = JSON.parse(messages[0].Body); - t.truthy(message.cumulus_meta.execution_name); - expectedMessage.cumulus_meta.execution_name = message.cumulus_meta.execution_name; - t.deepEqual(message, expectedMessage); + ); + + const message2 = messages.find((message) => + message.payload.granules[0].granuleId === granule2.granuleId); + t.truthy(message2); + t.deepEqual( + message2, + { + cumulus_meta: { + // The execution name is randomly generated, so we don't care what the value is here + execution_name: message2.cumulus_meta.execution_name, + state_machine: t.context.stateMachineArn + }, + meta: { + pdr: event.input.pdr, + collection: collectionConfig2, + provider: { name: 'provider-name' } + }, + payload: { + granules: [ + { + granuleId: granule2.granuleId, + files: granule2.files + } + ] + } + } + ); }); test.todo('An appropriate error is thrown if the message template could not be fetched'); diff --git a/tasks/sync-granule/package.json b/tasks/sync-granule/package.json index a906ce1b868..d1f130ffd23 100644 --- a/tasks/sync-granule/package.json +++ b/tasks/sync-granule/package.json @@ -18,8 +18,7 @@ "test": "ava", "test-coverage": "nyc ava", "build": "rm -rf dist && mkdir dist && cp -R schemas dist/ && webpack --progress", - "watch": "rm -rf dist && mkdir dist && cp -R schemas dist/ && webpack --progress -w", - "postinstall": "npm run build" + "watch": "rm -rf dist && mkdir dist && cp -R schemas dist/ && webpack --progress -w" }, "author": "Cumulus Authors", "license": "Apache-2.0", diff --git a/tests/.eslintrc.json b/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/tests/fixtures/workflows/pdr_parse_ingest.json b/tests/fixtures/workflows/pdr_parse_ingest.json index 70c03187bb5..830b0d6c08e 100644 --- a/tests/fixtures/workflows/pdr_parse_ingest.json +++ b/tests/fixtures/workflows/pdr_parse_ingest.json @@ -44,7 +44,7 @@ "useQueue": true, "provider": "{{$.meta.provider}}", "collection": "{{$.meta.collection}}", - "bucket": "{{$.meta.buckets.internal}}", + "internalBucket": "{{$.meta.buckets.internal}}", "stack": "{{$.meta.stack}}", "templateUri": "{{$.meta.templates.IngestGranule}}", "queueUrl": "{{$.meta.queues.startSF}}" @@ -52,4 +52,4 @@ } ] } -} \ No newline at end of file +} diff --git a/tests/ftp_pdr_parse_ingest.js b/tests/ftp_pdr_parse_ingest.js index b7f9b31f771..a0589ede054 100644 --- a/tests/ftp_pdr_parse_ingest.js +++ b/tests/ftp_pdr_parse_ingest.js @@ -19,6 +19,7 @@ const { sqs, receiveSQSMessages } = require('../packages/common/aws'); +const { CollectionConfigStore } = require('../packages/common'); const workflowSet = require('./fixtures/workflows/pdr_parse_ingest.json'); const collections = require('./fixtures/collections.json'); const providers = require('./fixtures/providers.json'); @@ -31,12 +32,20 @@ const context = {}; const cmaFolder = 'cumulus-message-adapter'; -test.before(async() => { +test.before(async () => { context.internal = randomString(); context.stack = randomString(); context.templates = {}; await s3().createBucket({ Bucket: context.internal }).promise(); + const collectionConfigStore = new CollectionConfigStore(context.internal, context.stack); + await Promise.all([ + collectionConfigStore.put('MOD09GQ', { name: 'MOD09GQ', granuleExtractionId: '(.*)' }), + collectionConfigStore.put('AST_L1A', { name: 'AST_L1A', granuleExtractionId: '(.*)' }), + collectionConfigStore.put('MOD87GQ', { name: 'MOD87GQ', granuleExtractionId: '(.*)' }), + collectionConfigStore.put('MYD13A1', { name: 'MYD13A1', granuleExtractionId: '(.*)' }) + ]); + // download and unzip the message adapter const { src, dest } = await downloadCMA(); context.src = src; @@ -91,7 +100,7 @@ test.serial('Discover and queue PDRs with FTP provider', async (t) => { // discover-pdr must return a list of PDRs const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; t.true(Array.isArray(pdrs)); - t.is(pdrs.length, 4); + t.is(pdrs.length, 5); t.is(msg.output.payload.pdrs_queued, pdrs.length); }); @@ -105,23 +114,24 @@ test.serial('Parse Pdrs from the previous step', async (t) => { const messages = await receiveSQSMessages(context.queueUrl, 4); - for (const input of messages) { - const msg = await runWorkflow(workflow, input.Body); - t.truthy(msg.input.payload.pdr); - t.is( - msg.output.payload.granules.length, - msg.output.payload.granulesCount - ); - } + await Promise.all( // eslint-disable-line function-paren-newline + messages.map(async (input) => { + const msg = await runWorkflow(workflow, input.Body); + t.truthy(msg.input.payload.pdr); + t.is( + msg.output.payload.granules.length, + msg.output.payload.granulesCount + ); + })); }); -test.afterEach.always(async(t) => { - await deleteCMAFromTasks(t.context.workflow, cmaFolder); -}); +test.afterEach.always(async (t) => + deleteCMAFromTasks(t.context.workflow, cmaFolder)); -test.after.always('final cleanup', async() => { - await recursivelyDeleteS3Bucket(context.internal); - await sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(); - await fs.remove(context.src); - await fs.remove(context.dest); -}); +test.after.always('final cleanup', () => + Promise.all([ + recursivelyDeleteS3Bucket(context.internal), + sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(), + fs.remove(context.src), + fs.remove(context.dest) + ])); diff --git a/tests/sftp_pdr_parse_ingest.js b/tests/sftp_pdr_parse_ingest.js index eccac52b338..babeab30f5e 100644 --- a/tests/sftp_pdr_parse_ingest.js +++ b/tests/sftp_pdr_parse_ingest.js @@ -17,6 +17,7 @@ const { sqs, receiveSQSMessages } = require('../packages/common/aws'); +const { CollectionConfigStore } = require('../packages/common'); const workflowSet = require('./fixtures/workflows/pdr_parse_ingest.json'); const collections = require('./fixtures/collections.json'); const providers = require('./fixtures/providers.json'); @@ -28,13 +29,20 @@ const providers = require('./fixtures/providers.json'); const context = {}; const cmaFolder = 'cumulus-message-adapter'; - -test.before(async() => { +test.before(async () => { context.internal = randomString(); context.stack = randomString(); context.templates = {}; await s3().createBucket({ Bucket: context.internal }).promise(); + const collectionConfigStore = new CollectionConfigStore(context.internal, context.stack); + await Promise.all([ + collectionConfigStore.put('MOD09GQ', { name: 'MOD09GQ', granuleExtractionId: '(.*)' }), + collectionConfigStore.put('AST_L1A', { name: 'AST_L1A', granuleExtractionId: '(.*)' }), + collectionConfigStore.put('MOD87GQ', { name: 'MOD87GQ', granuleExtractionId: '(.*)' }), + collectionConfigStore.put('MYD13A1', { name: 'MYD13A1', granuleExtractionId: '(.*)' }) + ]); + // download and unzip the message adapter const { src, dest } = await downloadCMA(); context.src = src; @@ -75,7 +83,7 @@ test.before(async() => { await Promise.all(promises); }); -test.serial('Discover and queue PDRs with FTP provider', async (t) => { +test.serial('Discover and queue PDRs with SFTP provider', async (t) => { const workflow = workflowSet.DiscoverPdrs; t.context.workflow = workflow; const input = context.templates.DiscoverPdrs; @@ -89,7 +97,7 @@ test.serial('Discover and queue PDRs with FTP provider', async (t) => { // discover-pdr must return a list of PDRs const pdrs = msg.stepOutputs.DiscoverPdrs.payload.pdrs; t.true(Array.isArray(pdrs)); - t.is(pdrs.length, 4); + t.is(pdrs.length, 5); t.is(msg.output.payload.pdrs_queued, pdrs.length); }); @@ -103,23 +111,24 @@ test.serial('Parse Pdrs from the previous step', async (t) => { const messages = await receiveSQSMessages(context.queueUrl, 4); - for (const input of messages) { - const msg = await runWorkflow(workflow, input.Body); - t.truthy(msg.input.payload.pdr); - t.is( - msg.output.payload.granules.length, - msg.output.payload.granulesCount - ); - } + await Promise.all( // eslint-disable-line function-paren-newline + messages.map(async (input) => { + const msg = await runWorkflow(workflow, input.Body); + t.truthy(msg.input.payload.pdr); + t.is( + msg.output.payload.granules.length, + msg.output.payload.granulesCount + ); + })); }); -test.afterEach.always(async(t) => { - await deleteCMAFromTasks(t.context.workflow, cmaFolder); -}); +test.afterEach.always(async (t) => + deleteCMAFromTasks(t.context.workflow, cmaFolder)); -test.after.always('final cleanup', async() => { - await recursivelyDeleteS3Bucket(context.internal); - await sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(); - await fs.remove(context.src); - await fs.remove(context.dest); -}); +test.after.always('final cleanup', () => + Promise.all([ + recursivelyDeleteS3Bucket(context.internal), + sqs().deleteQueue({ QueueUrl: context.queueUrl }).promise(), + fs.remove(context.src), + fs.remove(context.dest) + ]));