diff --git a/cumulus/tasks/discover-granules/index.js b/cumulus/tasks/discover-granules/index.js index eaa912577d2..73e786c5b8b 100644 --- a/cumulus/tasks/discover-granules/index.js +++ b/cumulus/tasks/discover-granules/index.js @@ -1,7 +1,6 @@ 'use strict'; const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); -const get = require('lodash.get'); const granule = require('@cumulus/ingest/granule'); const log = require('@cumulus/common/log'); @@ -15,9 +14,8 @@ const log = require('@cumulus/common/log'); **/ async function discoverGranules(event) { const protocol = event.config.provider.protocol; - const useQueue = get(event.config, 'useQueue', true); - const Discover = granule.selector('discover', protocol, useQueue); + const Discover = granule.selector('discover', protocol); const discover = new Discover(event); let granules; diff --git a/cumulus/tasks/discover-granules/package.json b/cumulus/tasks/discover-granules/package.json index b0f5c59612e..05678c8c240 100644 --- a/cumulus/tasks/discover-granules/package.json +++ b/cumulus/tasks/discover-granules/package.json @@ -54,6 +54,7 @@ "@ava/babel-preset-stage-4": "^1.1.0", "@ava/babel-preset-transform-test-files": "^3.0.0", "ava": "^0.21.0", + "fs-extra": "^5.0.0", "lodash.clonedeep": "^4.5.0", "sinon": "^2.0.0-pre.5" } diff --git a/cumulus/tasks/discover-granules/schemas/config.json b/cumulus/tasks/discover-granules/schemas/config.json index 23cafccb955..7b276647fbc 100644 --- a/cumulus/tasks/discover-granules/schemas/config.json +++ b/cumulus/tasks/discover-granules/schemas/config.json @@ -2,12 +2,11 @@ "title": "DiscoverGranulesConfig", "description": "Describes the config used by the discover-granules task", "type": "object", + "required": [ "provider" ], "properties": { - "useQueue": { - "type": "boolean" - }, "provider": { "type": "object", + "required": [ "host", "protocol" ], "properties": { "id": { "type": "string" }, "username": { "type": "string" }, @@ -56,4 +55,4 @@ } } } -} \ No newline at end of file +} diff --git a/cumulus/tasks/discover-granules/schemas/output.json b/cumulus/tasks/discover-granules/schemas/output.json index 4a7abff9ba5..fed81f935e2 100644 --- a/cumulus/tasks/discover-granules/schemas/output.json +++ b/cumulus/tasks/discover-granules/schemas/output.json @@ -2,20 +2,34 @@ "title": "DiscoverGranulesOutput", "description": "Describes the output produced by the discover-granules task", "type": "object", - "required": ["granules"], + "required": [ "granules" ], "additionalProperties": false, "properties": { - "granules_found": { "type": "integer" }, "granules": { "type": "array", "items": { "type": "object", - "additionalProperties": false, + "required": [ "granuleId", "files" ], "properties": { - "name": { "type": "string" }, "granuleId": { "type": "string" }, - "bucket": { "type": "string" }, - "url_path": { "type": "string" } + "files": { + "type": "array", + "items": { + "type": "object", + "required": [ "name", "path" ], + "properties": { + "name": { "type": "string" }, + "path": { "type": "string" }, + "size": { "type": "integer" }, + "time": { + "description": "The number of milliseconds since January 1, 1970, 00:00:00 UTC", + "type": "integer" + }, + "bucket": { "type": "string" }, + "url_path": { "type": "string" } + } + } + } } } } diff --git a/cumulus/tasks/discover-granules/tests/index.js b/cumulus/tasks/discover-granules/tests/index.js new file mode 100644 index 00000000000..f4db4ae28c8 --- /dev/null +++ b/cumulus/tasks/discover-granules/tests/index.js @@ -0,0 +1,136 @@ +'use strict'; + +const fs = require('fs-extra'); +const path = require('path'); +const test = require('ava'); +const mur = require('./fixtures/mur.json'); +const { cloneDeep } = require('lodash'); +const { recursivelyDeleteS3Bucket, s3 } = require('@cumulus/common/aws'); +const { + findGitRepoRootDirectory, + randomString, + validateConfig, + validateOutput +} = require('@cumulus/common/test-utils'); +const { discoverGranules } = require('../index'); + +test('discover granules using FTP', async (t) => { + const event = cloneDeep(mur); + + await validateConfig(t, event.config); + + try { + const output = await discoverGranules(event); + + await validateOutput(t, output); + t.is(output.granules.length, 3); + t.is(output.granules[0].files.length, 2); + } + catch (e) { + if (e.message.includes('getaddrinfo ENOTFOUND')) { + t.pass('Ignoring this test. Test server seems to be down'); + } + else t.fail(e); + } +}); + +test('discover granules using SFTP', async (t) => { + const internalBucketName = randomString(); + const providerPath = randomString(); + + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const providerPathDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', providerPath); + + // Create providerPathDirectory and internal bucket + await Promise.all([ + fs.ensureDir(providerPathDirectory), + s3().createBucket({ Bucket: internalBucketName }).promise() + ]); + + // State sample files + const files = [ + 'granule-1.nc', 'granule-1.nc.md5', + 'granule-2.nc', 'granule-2.nc.md5', + 'granule-3.nc', 'granule-3.nc.md5' + ]; + await Promise.all(files.map((file) => + fs.outputFile(path.join(providerPathDirectory, file), `This is ${file}`))); + + const event = cloneDeep(mur); + // The test-data prefix is required in the provider_path because of the way + // that the sftp container is configured in docker-compose.yml. + event.config.collection.provider_path = `test-data/${providerPath}`; + event.config.provider = { + id: 'MODAPS', + protocol: 'sftp', + host: 'localhost', + port: 2222, + username: 'user', + password: 'password' + }; + + await validateConfig(t, event.config); + + try { + const output = await discoverGranules(event); + await validateOutput(t, output); + t.is(output.granules.length, 3); + t.is(output.granules[0].files.length, 2); + } + finally { + // Clean up + await Promise.all([ + recursivelyDeleteS3Bucket(internalBucketName), + fs.remove(providerPathDirectory) + ]); + } +}); + +test('discover granules using HTTP', async (t) => { + const internalBucketName = randomString(); + const providerPath = randomString(); + + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const providerPathDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', providerPath); + + // Create providerPathDirectory and internal bucket + await Promise.all([ + fs.ensureDir(providerPathDirectory), + s3().createBucket({ Bucket: internalBucketName }).promise() + ]); + + // State sample files + const files = [ + 'granule-1.nc', 'granule-1.nc.md5', + 'granule-2.nc', 'granule-2.nc.md5', + 'granule-3.nc', 'granule-3.nc.md5' + ]; + await Promise.all(files.map((file) => + fs.outputFile(path.join(providerPathDirectory, file), `This is ${file}`))); + + const event = cloneDeep(mur); + event.config.collection.provider_path = providerPath; + event.config.provider = { + id: 'MODAPS', + protocol: 'http', + host: 'http://localhost:8080' + }; + + await validateConfig(t, event.config); + + try { + const output = await discoverGranules(event); + await validateOutput(t, output); + t.is(output.granules.length, 3); + t.is(output.granules[0].files.length, 2); + } + finally { + // Clean up + await Promise.all([ + recursivelyDeleteS3Bucket(internalBucketName), + fs.remove(providerPathDirectory) + ]); + } +}); diff --git a/cumulus/tasks/discover-granules/tests/test.js b/cumulus/tasks/discover-granules/tests/test.js deleted file mode 100644 index 6c20258e478..00000000000 --- a/cumulus/tasks/discover-granules/tests/test.js +++ /dev/null @@ -1,80 +0,0 @@ -'use strict'; - -const test = require('ava'); -const mur = require('./fixtures/mur.json'); -const { cloneDeep } = require('lodash'); -const { recursivelyDeleteS3Bucket, s3, sqs } = require('@cumulus/common/aws'); -const { createQueue, randomString } = require('@cumulus/common/test-utils'); -const { discoverGranules } = require('../index'); - -async function uploadMessageTemplate(Bucket) { - const templateKey = randomString(); - - const messageTemplate = { - cumulus_meta: { - state_machine: randomString() - }, - meta: {}, - payload: {}, - exception: null - }; - - await s3().putObject({ - Bucket, - Key: templateKey, - Body: JSON.stringify(messageTemplate) - }).promise(); - - return `s3://${Bucket}/${templateKey}`; -} - -test('test discovering mur granules', async (t) => { - const event = cloneDeep(mur); - event.config.useQueue = false; - - try { - const output = await discoverGranules(event); - t.is(output.granules.length, 3); - t.is(output.granules[0].files.length, 2); - } - catch (e) { - if (e.message.includes('getaddrinfo ENOTFOUND')) { - t.pass('Ignoring this test. Test server seems to be down'); - } - else t.fail(e); - } -}); - -test('test discovering mur granules over FTP with queue', async (t) => { - const internalBucket = randomString(); - const messageTemplateBucket = randomString(); - await Promise.all([ - s3().createBucket({ Bucket: messageTemplateBucket }).promise(), - s3().createBucket({ Bucket: internalBucket }).promise() - ]); - - const event = cloneDeep(mur); - event.config.buckets.internal = internalBucket; - event.config.collection.provider_path = '/allData/ghrsst/data/GDS2/L4/GLOB/JPL/MUR/v4.1/2017/(20[1-3])'; // eslint-disable-line max-len - event.config.queueUrl = await createQueue(); - event.config.templateUri = await uploadMessageTemplate(messageTemplateBucket); - event.config.useQueue = true; - - try { - const output = await discoverGranules(event); - t.is(output.granules.length, 3); - } - catch (e) { - if (e.message.includes('getaddrinfo ENOTFOUND')) { - t.pass('Ignoring this test. Test server seems to be down'); - } - else t.fail(e); - } - finally { - await Promise.all([ - sqs().deleteQueue({ QueueUrl: event.config.queueUrl }).promise(), - recursivelyDeleteS3Bucket(internalBucket), - recursivelyDeleteS3Bucket(messageTemplateBucket) - ]); - } -}); diff --git a/cumulus/tasks/discover-pdrs/index.js b/cumulus/tasks/discover-pdrs/index.js index 11e4518d1f0..0aa6b133d98 100644 --- a/cumulus/tasks/discover-pdrs/index.js +++ b/cumulus/tasks/discover-pdrs/index.js @@ -2,7 +2,6 @@ const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); const get = require('lodash.get'); -const ProviderNotFound = require('@cumulus/common/errors').ProviderNotFound; const pdr = require('@cumulus/ingest/pdr'); const errors = require('@cumulus/common/errors'); const log = require('@cumulus/common/log'); @@ -17,82 +16,59 @@ const local = require('@cumulus/common/local-helpers'); function discoverPdrs(event) { try { const config = get(event, 'config', {}); - const queue = get(config, 'useQueue', true); const stack = config.stack; const bucket = config.bucket; - const queueUrl = config.queueUrl; - const templateUri = config.templateUri; const collection = config.collection; const provider = config.provider; - const output = {}; - log.info('Received the provider', { provider: get(provider, 'id') }); - if (!provider) { - const err = new ProviderNotFound('Provider info not provided'); - log.error(err); - return Promise.reject(err); - } - - const Discover = pdr.selector('discover', provider.protocol, queue); + const Discover = pdr.selector('discover', provider.protocol); const discover = new Discover( stack, bucket, collection, - provider, - queueUrl, - templateUri + provider ); log.debug('Starting PDR discovery'); - return discover.discover().then((pdrs) => { - if (queue) { - output.pdrs_found = pdrs.length; - } - else { - output.pdrs = pdrs; - } - - if (discover.connected) { - discover.end(); - log.debug(`Ending ${provider.protocol} connection`); - } - - return output; - }) - .catch((e) => { - log.error(e); + return discover.discover() + .then((pdrs) => { + if (discover.connected) discover.end(); + return { pdrs }; + }) + .catch((e) => { + log.error(e); - if (discover.connected) { - discover.end(); - log.debug(`Ending ${provider.protocol} connection`); - } + if (discover.connected) { + discover.end(); + log.debug(`Ending ${provider.protocol} connection`); + } - if (e.toString().includes('ECONNREFUSED')) { - const err = new errors.RemoteResourceError('Connection Refused'); - log.error(err); - throw err; - } - else if (e.message.includes('Please login with USER and PASS')) { - const err = new errors.FTPError('Login incorrect'); - log.error(err); - throw err; - } - else if (e.details && e.details.status === 'timeout') { - const err = new errors.ConnectionTimeout('connection Timed out'); - log.error(err); - throw err; - } - else if (e.details && e.details.status === 'notfound') { - const err = new errors.HostNotFound(`${e.details.url} not found`); - log.error(err); - throw err; - } + if (e.toString().includes('ECONNREFUSED')) { + const err = new errors.RemoteResourceError('Connection Refused'); + log.error(err); + throw err; + } + else if (e.message.includes('Please login with USER and PASS')) { + const err = new errors.FTPError('Login incorrect'); + log.error(err); + throw err; + } + else if (e.details && e.details.status === 'timeout') { + const err = new errors.ConnectionTimeout('connection Timed out'); + log.error(err); + throw err; + } + else if (e.details && e.details.status === 'notfound') { + const err = new errors.HostNotFound(`${e.details.url} not found`); + log.error(err); + throw err; + } - throw e; - }); + throw e; + }); } catch (e) { log.error(e); @@ -118,4 +94,4 @@ exports.handler = handler; local.justLocalRun(() => { const payload = require('@cumulus/test-data/cumulus_messages/discover-pdrs.json'); // eslint-disable-line global-require, max-len handler(payload, {}, (e, r) => console.log(e, r)); -}); \ No newline at end of file +}); diff --git a/cumulus/tasks/discover-pdrs/package.json b/cumulus/tasks/discover-pdrs/package.json index fca9f6ebb81..7789183408b 100644 --- a/cumulus/tasks/discover-pdrs/package.json +++ b/cumulus/tasks/discover-pdrs/package.json @@ -53,6 +53,8 @@ "@ava/babel-preset-stage-4": "^1.1.0", "@ava/babel-preset-transform-test-files": "^3.0.0", "ava": "^0.21.0", + "fs-extra": "^5.0.0", + "lodash": "^4.17.5", "sinon": "^2.0.0-pre.5" } } diff --git a/cumulus/tasks/discover-pdrs/schemas/config.json b/cumulus/tasks/discover-pdrs/schemas/config.json index 5f60101f78e..bc70033110c 100644 --- a/cumulus/tasks/discover-pdrs/schemas/config.json +++ b/cumulus/tasks/discover-pdrs/schemas/config.json @@ -9,16 +9,6 @@ "stack" ], "properties": { - "useQueue": { - "type": "boolean" - }, - "queueUrl": { - "type": "string" - }, - "templateUri": { - "type": "string", - "description": "the S3 uri to the cumulus-message template for the parse pdr workflow" - }, "stack": { "description": "The name of the Task's CloudFormation Task, useful as a prefix", "type": "string" diff --git a/cumulus/tasks/discover-pdrs/schemas/output.json b/cumulus/tasks/discover-pdrs/schemas/output.json index 58f3f6d19d7..11d03cb2ecb 100644 --- a/cumulus/tasks/discover-pdrs/schemas/output.json +++ b/cumulus/tasks/discover-pdrs/schemas/output.json @@ -2,23 +2,23 @@ "title": "DiscoverPdrsOutput", "description": "Describes the output produced by the discover-pdrs task", "type": "object", - "required": ["pdrs", "pdrs_found"], + "required": ["pdrs"], "additionalProperties": false, "properties": { - "pdrs_found": { "type": "integer" }, "pdrs": { "type": "array", "items": { "type": "object", "additionalProperties": false, + "required": [ "name", "path" ], "properties": { "name": { "type": "string" }, - "type": { "type": "integer" }, - "size": { "type": "string" }, - "time": { "type": "string" }, - "owner": { "type": "string" }, - "group": { "type": "string" }, - "path": { "type": "string" } + "path": { "type": "string" }, + "size": { "type": "integer" }, + "time": { + "description": "The number of milliseconds since January 1, 1970, 00:00:00 UTC", + "type": "integer" + } } } } diff --git a/cumulus/tasks/discover-pdrs/tests/.eslintrc.json b/cumulus/tasks/discover-pdrs/tests/.eslintrc.json new file mode 100644 index 00000000000..ada42bca77f --- /dev/null +++ b/cumulus/tasks/discover-pdrs/tests/.eslintrc.json @@ -0,0 +1,5 @@ +{ + "rules": { + "no-param-reassign": "off" + } +} diff --git a/cumulus/tasks/discover-pdrs/tests/fixtures/full_message.json b/cumulus/tasks/discover-pdrs/tests/fixtures/full_message.json deleted file mode 100644 index d8237b79519..00000000000 --- a/cumulus/tasks/discover-pdrs/tests/fixtures/full_message.json +++ /dev/null @@ -1,66 +0,0 @@ -{ - "workflow_config": { - "DiscoverPdrs": { - "useQueue": false, - "stack": "{{$.cumulus_meta.stack}}", - "provider": "{{$.meta.provider}}", - "buckets": "{{$.cumulus_meta.buckets}}", - "collection": "{{$.meta.collection}}" - } - }, - "cumulus_meta": { - "stack": "myStack", - "buckets": { - "internal": "ghrc-r-deploy", - "private": "ghrc-r-private", - "protected": "ghrc-r-protected", - "public": "ghrc-r-public" - }, - "task": "DiscoverPdrs", - "message_source": "local", - "id": "id-1234" - }, - "meta": { - "provider": { - "id": "MUR", - "globalConnectionLimit": 10, - "protocol": "ftp", - "host": "podaac-ftp.jpl.nasa.gov" - }, - "collection": { - "dataType": "MOD09GQ", - "name": "MOD09GQ", - "version": "006", - "process": "modis", - "provider_path": "/TEST_B/Cumulus/PDR/TEST_CASES", - "granuleId": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf", - "granuleIdExtraction": "(MOD09GQ\\.(.*))\\.hdf", - "files": [ - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", - "bucket": "protected", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", - "bucket": "private", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf.met" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.meta\\.xml$", - "bucket": "protected", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.meta.xml" - }, - { - "regex": "^MOD09GQ\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", - "bucket": "public", - "sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104_1.jpg" - } - ] - } - }, - "payload": { - "event": 1 - } -} diff --git a/cumulus/tasks/discover-pdrs/tests/fixtures/input.json b/cumulus/tasks/discover-pdrs/tests/fixtures/input.json index 03655017b2b..9be4bb3b2e7 100644 --- a/cumulus/tasks/discover-pdrs/tests/fixtures/input.json +++ b/cumulus/tasks/discover-pdrs/tests/fixtures/input.json @@ -1,9 +1,6 @@ { "config": { - "useQueue": false, "stack": "lpdaac-cumulus-phaseIII", - "queueUrl": "http://somequeue.com", - "templateUri": "s3://path/to/template.json", "provider": { "id": "MUR", "globalConnectionLimit": 10, diff --git a/cumulus/tasks/discover-pdrs/tests/index.js b/cumulus/tasks/discover-pdrs/tests/index.js index 8f32e6ae2a8..2ca27900425 100644 --- a/cumulus/tasks/discover-pdrs/tests/index.js +++ b/cumulus/tasks/discover-pdrs/tests/index.js @@ -2,59 +2,54 @@ const test = require('ava'); const path = require('path'); -const sinon = require('sinon'); -const { - ProviderNotFound, - FTPError, - RemoteResourceError -} = require('@cumulus/common/errors'); -const { S3 } = require('@cumulus/ingest/aws'); +const fs = require('fs-extra'); +const { FTPError, RemoteResourceError } = require('@cumulus/common/errors'); +const { cloneDeep } = require('lodash'); const { discoverPdrs } = require('../index'); const input = require('./fixtures/input.json'); -const aws = require('@cumulus/common/aws'); -const testUtils = require('@cumulus/common/test-utils'); - -test('error when provider info is missing', (t) => - discoverPdrs({}) - .then(t.fail) - .catch((e) => t.true(e instanceof ProviderNotFound))); +const { recursivelyDeleteS3Bucket, s3 } = require('@cumulus/common/aws'); +const { + findGitRepoRootDirectory, + randomString, + validateConfig, + validateOutput +} = require('@cumulus/common/test-utils'); -test('test pdr discovery with FTP assuming all PDRs are new', (t) => { - const testInput = Object.assign({}, input); - testInput.config.provider = { +test('test pdr discovery with FTP assuming all PDRs are new', async (t) => { + const event = cloneDeep(input); + event.config.bucket = randomString(); + event.config.collection.provider_path = '/pdrs'; + event.config.provider = { id: 'MODAPS', protocol: 'ftp', host: 'localhost', username: 'testuser', password: 'testpass' }; - testInput.config.collection.provider_path = '/pdrs'; - testInput.config.useQueue = false; - const ps = { - 'MYD13A1_5_grans.PDR': false, - 'PDN.ID1611071307.PDR': false, - 'PDN.ID1611081200.PDR': false - }; - sinon.stub(S3, 'fileExists').callsFake((b, k) => ps[path.basename(k)]); + await validateConfig(t, event.config); - return discoverPdrs(testInput, {}) - .then((result) => { - S3.fileExists.restore(); - t.is(result.pdrs.length, 4); - }) - .catch((err) => { - S3.fileExists.restore(); - if (err instanceof RemoteResourceError) { - t.pass('ignoring this test. Test server seems to be down'); - } - else throw err; - }); + await s3().createBucket({ Bucket: event.config.bucket }).promise(); + + try { + const output = await discoverPdrs(event); + await validateOutput(t, output); + t.is(output.pdrs.length, 4); + } + catch (err) { + if (err instanceof RemoteResourceError) { + t.pass('ignoring this test. Test server seems to be down'); + } + else t.fail(err); + } + finally { + await recursivelyDeleteS3Bucket(event.config.bucket); + } }); -test('test pdr discovery with FTP invalid user/pass', (t) => { +test('test pdr discovery with FTP invalid user/pass', async (t) => { const provider = { id: 'MODAPS', protocol: 'ftp', @@ -63,10 +58,12 @@ test('test pdr discovery with FTP invalid user/pass', (t) => { password: 'testpass' }; - const newPayload = Object.assign({}, input); + const newPayload = cloneDeep(input); newPayload.config.provider = provider; newPayload.input = {}; + await validateConfig(t, newPayload.config); + return discoverPdrs(newPayload, {}) .then(t.fail) .catch((e) => { @@ -80,20 +77,22 @@ test('test pdr discovery with FTP invalid user/pass', (t) => { }); }); -test('test pdr discovery with FTP connection refused', (t) => { +test('test pdr discovery with FTP connection refused', async (t) => { const provider = { id: 'MODAPS', protocol: 'ftp', host: 'localhost', - port: '30', // using port that doesn't exist to nonresponsiveness + port: 30, // using port that doesn't exist to nonresponsiveness username: 'testuser1', password: 'testpass' }; - const newPayload = Object.assign({}, input); + const newPayload = cloneDeep(input); newPayload.config.provider = provider; newPayload.input = {}; + await validateConfig(t, newPayload.config); + return discoverPdrs(newPayload, {}) .then(t.fail) .catch((e) => { @@ -101,7 +100,7 @@ test('test pdr discovery with FTP connection refused', (t) => { }); }); -test('test pdr discovery with FTP assuming some PDRs are new', (t) => { +test('test pdr discovery with FTP assuming some PDRs are new', async (t) => { const provider = { id: 'MODAPS', protocol: 'ftp', @@ -110,15 +109,14 @@ test('test pdr discovery with FTP assuming some PDRs are new', (t) => { password: 'testpass' }; - const newPayload = Object.assign({}, input); + const newPayload = cloneDeep(input); newPayload.config.provider = provider; - newPayload.config.useQueue = false; newPayload.config.collection.provider_path = '/pdrs'; newPayload.input = {}; - const internalBucketName = testUtils.randomString(); + const internalBucketName = randomString(); newPayload.config.bucket = internalBucketName; - return aws.s3().createBucket({ Bucket: internalBucketName }).promise() + return s3().createBucket({ Bucket: internalBucketName }).promise() .then(() => { const Key = [ newPayload.config.stack, @@ -126,7 +124,7 @@ test('test pdr discovery with FTP assuming some PDRs are new', (t) => { 'PDN.ID1611071307.PDR' ].join('/'); - return aws.s3().putObject({ + return s3().putObject({ Bucket: internalBucketName, Key, Body: 'PDN.ID1611071307.PDR' @@ -135,90 +133,163 @@ test('test pdr discovery with FTP assuming some PDRs are new', (t) => { .then(() => discoverPdrs(newPayload, {})) .then((output) => { t.is(output.pdrs.length, 3); - return aws.recursivelyDeleteS3Bucket(internalBucketName); + return validateOutput(t, output); }) + .then(() => recursivelyDeleteS3Bucket(internalBucketName)) .catch((e) => { if (e instanceof RemoteResourceError) { t.pass('ignoring this test. Test server seems to be down'); - return aws.recursivelyDeleteS3Bucket(internalBucketName); + return recursivelyDeleteS3Bucket(internalBucketName); } - return aws.recursivelyDeleteS3Bucket(internalBucketName).then(t.fail); + return recursivelyDeleteS3Bucket(internalBucketName).then(t.fail); }); }); -test('test pdr discovery with HTTP assuming some PDRs are new', (t) => { - const provider = { - id: 'MODAPS', - protocol: 'http', - host: 'http://localhost:8080' - }; +test('test pdr discovery with HTTP assuming some PDRs are new', async (t) => { + const internalBucketName = randomString(); + const providerPath = randomString(); - const newPayload = Object.assign({}, input); - newPayload.config.provider = provider; - newPayload.config.useQueue = false; - newPayload.config.collection.provider_path = '/'; - newPayload.input = {}; + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const testDataDirectory = path.join(gitRepoRootDirectory, 'packages', 'test-data', 'pdrs'); + const providerPathDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', providerPath); - const internalBucketName = testUtils.randomString(); - newPayload.config.bucket = internalBucketName; - return aws.s3().createBucket({ Bucket: internalBucketName }).promise() - .then(() => aws.s3().putObject({ + // Create providerPathDirectory and internal bucket + await Promise.all([ + fs.ensureDir(providerPathDirectory), + s3().createBucket({ Bucket: internalBucketName }).promise() + ]); + + try { + // Copy the PDRs to the HTTP directory + const pdrFilenames = await fs.readdir(testDataDirectory); + + const oldPdr = pdrFilenames[0]; + const newPdrs = pdrFilenames.slice(1); + + await Promise.all(pdrFilenames.map((pdrFilename) => fs.copy( + path.join(testDataDirectory, pdrFilename), + path.join(providerPathDirectory, pdrFilename)))); + + // Build the event + const event = cloneDeep(input); + event.config.bucket = internalBucketName; + event.config.provider = { + id: 'MODAPS', + protocol: 'http', + host: 'http://localhost:8080' + }; + event.config.collection.provider_path = providerPath; + event.input = {}; + + // Mark one of the PDRs as not new + await s3().putObject({ Bucket: internalBucketName, - Key: 'lpdaac-cumulus-phaseIII/pdrs/pdrs/PDN.ID1611071307.PDR', - Body: 'PDN.ID1611071307.PDR' - }).promise()) - .then(() => discoverPdrs(newPayload, {})) - .then((output) => { - t.is(output.pdrs.length, 2); - return aws.recursivelyDeleteS3Bucket(internalBucketName); - }) - .catch((e) => { + // 'pdrs' is the default 'folder' value in the Discover contructor + Key: `${event.config.stack}/pdrs/${oldPdr}`, + Body: 'Pretend this is a PDR' + }).promise(); + + await validateConfig(t, event.config); + let output; + try { + output = await discoverPdrs(event, {}); + + await validateOutput(t, output); + + t.is(output.pdrs.length, 3); + const names = output.pdrs.map((p) => p.name); + newPdrs.forEach((pdr) => t.true(names.includes(pdr))); + } + catch (e) { if (e instanceof RemoteResourceError) { - t.pass('ignoring this test. Test server seems to be down'); - return aws.recursivelyDeleteS3Bucket(internalBucketName); + t.pass('Ignoring this test. Test server seems to be down'); } - return aws.recursivelyDeleteS3Bucket(internalBucketName).then(t.fail); - }); + else t.fail(e); + } + } + finally { + // Clean up + await Promise.all([ + recursivelyDeleteS3Bucket(internalBucketName), + fs.remove(providerPathDirectory) + ]); + } }); -test('test pdr discovery with SFTP assuming some PDRs are new', (t) => { - const provider = { - id: 'MODAPS', - protocol: 'sftp', - host: 'localhost', - port: 2222, - username: 'user', - password: 'password' - }; +test('test pdr discovery with SFTP assuming some PDRs are new', async (t) => { + const internalBucketName = randomString(); + const providerPath = randomString(); - const newPayload = Object.assign({}, input); - newPayload.config.provider = provider; - newPayload.config.useQueue = false; - newPayload.config.collection.provider_path = 'test-data/pdrs'; - newPayload.input = {}; + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const testDataDirectory = path.join(gitRepoRootDirectory, 'packages', 'test-data', 'pdrs'); + const providerPathDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', providerPath); - const internalBucketName = testUtils.randomString(); - newPayload.config.bucket = internalBucketName; - return aws.s3().createBucket({ Bucket: internalBucketName }).promise() - .then(() => aws.s3().putObject({ + // Create providerPathDirectory and internal bucket + await Promise.all([ + fs.ensureDir(providerPathDirectory), + s3().createBucket({ Bucket: internalBucketName }).promise() + ]); + + try { + // Copy the PDRs to the SFTP directory + const pdrFilenames = await fs.readdir(testDataDirectory); + + const oldPdr = pdrFilenames[0]; + const newPdrs = pdrFilenames.slice(1); + + await Promise.all(pdrFilenames.map((pdrFilename) => fs.copy( + path.join(testDataDirectory, pdrFilename), + path.join(providerPathDirectory, pdrFilename)))); + + // Build the event + const event = cloneDeep(input); + event.config.bucket = internalBucketName; + event.config.provider = { + id: 'MODAPS', + protocol: 'sftp', + host: 'localhost', + port: 2222, + username: 'user', + password: 'password' + }; + // The test-data prefix is required because of the way that the sftp + // container is configured in docker-compose.yml. + event.config.collection.provider_path = `test-data/${providerPath}`; + event.input = {}; + + // Mark one of the PDRs as not new + await s3().putObject({ Bucket: internalBucketName, - Key: 'lpdaac-cumulus-phaseIII/pdrs/PDN.ID1611071307.PDR', - Body: 'PDN.ID1611071307.PDR' - }).promise()) - .then(() => discoverPdrs(newPayload, {})) - .then((output) => { + // 'pdrs' is the default 'folder' value in the Discover contructor + Key: `${event.config.stack}/pdrs/${oldPdr}`, + Body: 'Pretend this is a PDR' + }).promise(); + + await validateConfig(t, event.config); + let output; + try { + output = await discoverPdrs(event, {}); + + await validateOutput(t, output); + t.is(output.pdrs.length, 3); - const names = output.pdrs.map(p => p.name); - t.true(names.includes('MOD09GQ.PDR')); - t.true(names.includes('MYD13A1_5_grans.PDR')); - t.true(names.includes('PDN.ID1611081200.PDR')); - return aws.recursivelyDeleteS3Bucket(internalBucketName); - }) - .catch((e) => { + const names = output.pdrs.map((p) => p.name); + newPdrs.forEach((pdr) => t.true(names.includes(pdr))); + } + catch (e) { if (e instanceof RemoteResourceError) { - t.pass('ignoring this test. Test server seems to be down'); - return aws.recursivelyDeleteS3Bucket(internalBucketName); + t.pass('Ignoring this test. Test server seems to be down'); } - return aws.recursivelyDeleteS3Bucket(internalBucketName).then(t.fail); - }); + else t.fail(e); + } + } + finally { + // Clean up + await Promise.all([ + recursivelyDeleteS3Bucket(internalBucketName), + fs.remove(providerPathDirectory) + ]); + } }); diff --git a/cumulus/tasks/parse-pdr/index.js b/cumulus/tasks/parse-pdr/index.js index 2c3132ab4e7..393fe6b3f3c 100644 --- a/cumulus/tasks/parse-pdr/index.js +++ b/cumulus/tasks/parse-pdr/index.js @@ -46,12 +46,8 @@ function parsePdr(event) { return parse.ingest() .then((payload) => { - if (parse.connected) { - parse.end(); - } - - const output = Object.assign({}, event.input, payload); - return output; + if (parse.connected) parse.end(); + return Object.assign({}, event.input, payload); }) .catch((e) => { if (e.toString().includes('ECONNREFUSED')) { @@ -88,4 +84,4 @@ exports.handler = handler; justLocalRun(() => { const payload = require('@cumulus/test-data/cumulus_messages/parse-pdr.json'); // eslint-disable-line global-require, max-len handler(payload, {}, (e, r) => console.log(e, r)); -}); \ No newline at end of file +}); diff --git a/cumulus/tasks/parse-pdr/package.json b/cumulus/tasks/parse-pdr/package.json index 11ef8e53cf4..6e623a6e4d6 100644 --- a/cumulus/tasks/parse-pdr/package.json +++ b/cumulus/tasks/parse-pdr/package.json @@ -28,8 +28,7 @@ "babel-polyfill", "babel-register" ], - "color": false, - "serial": true + "color": false }, "babel": { "presets": [ @@ -57,6 +56,7 @@ "@ava/babel-preset-stage-4": "^1.1.0", "@ava/babel-preset-transform-test-files": "^3.0.0", "ava": "^0.21.0", + "fs-extra": "^5.0.0", "lodash.clonedeep": "^4.5.0", "proxyquire": "^1.8.0", "sinon": "^2.0.0-pre.5" diff --git a/cumulus/tasks/parse-pdr/tests/parse_pdrs_test.js b/cumulus/tasks/parse-pdr/tests/parse_pdrs_test.js index e46c32e882c..227b2ce9031 100644 --- a/cumulus/tasks/parse-pdr/tests/parse_pdrs_test.js +++ b/cumulus/tasks/parse-pdr/tests/parse_pdrs_test.js @@ -1,24 +1,24 @@ 'use strict'; -const aws = require('@cumulus/common/aws'); -const test = require('ava'); -const testUtils = require('@cumulus/common/test-utils'); const errors = require('@cumulus/common/errors'); +const fs = require('fs-extra'); const modis = require('@cumulus/test-data/payloads/new-message-schema/parse.json'); +const path = require('path'); +const test = require('ava'); + +const { recursivelyDeleteS3Bucket, s3 } = require('@cumulus/common/aws'); const { cloneDeep } = require('lodash'); +const { + findGitRepoRootDirectory, + randomString, + validateConfig, + validateInput, + validateOutput +} = require('@cumulus/common/test-utils'); const { parsePdr } = require('../index'); -test('error when provider info is missing', (t) => { - const newPayload = Object.assign({}, modis); - delete newPayload.config.provider; - - return parsePdr(newPayload) - .then(t.fail) - .catch((e) => t.true(e instanceof errors.ProviderNotFound)); -}); - -test('parse PDR from FTP endpoint', (t) => { +test('parse PDR from FTP endpoint', async (t) => { const provider = { id: 'MODAPS', protocol: 'ftp', @@ -31,58 +31,150 @@ test('parse PDR from FTP endpoint', (t) => { const newPayload = cloneDeep(modis); newPayload.config.provider = provider; - newPayload.config.useQueue = false; - const internalBucketName = testUtils.randomString(); + const internalBucketName = randomString(); newPayload.config.bucket = internalBucketName; - return aws.s3().createBucket({ Bucket: internalBucketName }).promise() + await validateConfig(t, newPayload.config); + + return s3().createBucket({ Bucket: internalBucketName }).promise() .then(() => parsePdr(newPayload)) .then((output) => { t.is(output.granules.length, output.granulesCount); t.is(output.pdr.name, pdrName); t.is(output.filesCount, 2); - return aws.recursivelyDeleteS3Bucket(internalBucketName); + return output; }) + .then((output) => validateOutput(t, output)) + .then(() => recursivelyDeleteS3Bucket(internalBucketName)) .catch((err) => { if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { t.pass('ignoring this test. Test server seems to be down'); } else t.fail(err); - return aws.recursivelyDeleteS3Bucket(internalBucketName); + return recursivelyDeleteS3Bucket(internalBucketName); }); }); -test('parse PDR from HTTP endpoint', (t) => { - const provider = { +test('parse PDR from HTTP endpoint', async (t) => { + const internalBucketName = randomString(); + const providerPath = randomString(); + + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const providerPathDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', providerPath); + + // Create providerPathDirectory and internal bucket + await Promise.all([ + fs.ensureDir(providerPathDirectory), + s3().createBucket({ Bucket: internalBucketName }).promise() + ]); + + const pdrName = 'MOD09GQ.PDR'; + + await fs.copy( + path.join(gitRepoRootDirectory, 'packages', 'test-data', 'pdrs', pdrName), + path.join(providerPathDirectory, pdrName)); + + const newPayload = cloneDeep(modis); + newPayload.config.bucket = internalBucketName; + newPayload.config.provider = { id: 'MODAPS', protocol: 'http', host: 'http://localhost:8080' }; + newPayload.input = { + pdr: { + name: pdrName, + path: `/${providerPath}` + } + }; + + await validateInput(t, newPayload.input); + await validateConfig(t, newPayload.config); + + try { + const output = await parsePdr(newPayload); + await validateOutput(t, output); + t.is(output.granules.length, output.granulesCount); + t.is(output.pdr.name, pdrName); + t.is(output.filesCount, 2); + } + catch (err) { + if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { + t.pass('ignoring this test. Test server seems to be down'); + } + else t.fail(err); + } + finally { + // Clean up + await Promise.all([ + recursivelyDeleteS3Bucket(internalBucketName), + fs.remove(providerPathDirectory) + ]); + } +}); + +test('parse PDR from SFTP endpoint', async (t) => { + const internalBucketName = randomString(); + const providerPath = randomString(); + + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const providerPathDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', providerPath); + + // Create providerPathDirectory and internal bucket + await Promise.all([ + fs.ensureDir(providerPathDirectory), + s3().createBucket({ Bucket: internalBucketName }).promise() + ]); const pdrName = 'MOD09GQ.PDR'; - const newPayload = cloneDeep(modis); - newPayload.config.provider = provider; - newPayload.config.useQueue = false; + await fs.copy( + path.join(gitRepoRootDirectory, 'packages', 'test-data', 'pdrs', pdrName), + path.join(providerPathDirectory, pdrName)); - const internalBucketName = testUtils.randomString(); + const newPayload = cloneDeep(modis); newPayload.config.bucket = internalBucketName; + newPayload.config.provider = { + id: 'MODAPS', + protocol: 'sftp', + host: 'localhost', + port: 2222, + username: 'user', + password: 'password' + }; + newPayload.input = { + pdr: { + name: pdrName, + // The test-data prefix is required because of the way that the sftp + // container is configured in docker-compose.yml. + path: `/test-data/${providerPath}` + } + }; - return aws.s3().createBucket({ Bucket: internalBucketName }).promise() - .then(() => parsePdr(newPayload)) - .then((output) => { - t.is(output.granules.length, output.granulesCount); - t.is(output.pdr.name, pdrName); - t.is(output.filesCount, 2); + await validateInput(t, newPayload.input); + await validateConfig(t, newPayload.config); - return aws.recursivelyDeleteS3Bucket(internalBucketName); - }) - .catch((err) => { - if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { - t.pass('ignoring this test. Test server seems to be down'); - } - else t.fail(err); - return aws.recursivelyDeleteS3Bucket(internalBucketName); - }); + try { + const output = await parsePdr(newPayload); + await validateOutput(t, output); + t.is(output.granules.length, output.granulesCount); + t.is(output.pdr.name, pdrName); + t.is(output.filesCount, 2); + } + catch (err) { + if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') { + t.pass('ignoring this test. Test server seems to be down'); + } + else t.fail(err); + } + finally { + // Clean up + await Promise.all([ + recursivelyDeleteS3Bucket(internalBucketName), + fs.remove(providerPathDirectory) + ]); + } }); diff --git a/cumulus/tasks/sync-granule/index.js b/cumulus/tasks/sync-granule/index.js index 16a9a63da77..3288324010f 100644 --- a/cumulus/tasks/sync-granule/index.js +++ b/cumulus/tasks/sync-granule/index.js @@ -4,7 +4,6 @@ const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js'); const errors = require('@cumulus/common/errors'); const lock = require('@cumulus/ingest/lock'); const granule = require('@cumulus/ingest/granule'); -const local = require('@cumulus/common/local-helpers'); const log = require('@cumulus/common/log'); /** @@ -108,9 +107,3 @@ exports.syncGranule = function syncGranule(event) { exports.handler = function handler(event, context, callback) { cumulusMessageAdapter.runCumulusTask(exports.syncGranule, event, context, callback); }; - -// use node index.js local to invoke this -local.justLocalRun(() => { - const payload = require('@cumulus/test-data/cumulus_messages/sync-granule.json'); // eslint-disable-line global-require, max-len - exports.handler(payload, {}, (e, r) => console.log(e, r)); -}); diff --git a/cumulus/tasks/sync-granule/package.json b/cumulus/tasks/sync-granule/package.json index ad39d30e29d..2c5a4697435 100644 --- a/cumulus/tasks/sync-granule/package.json +++ b/cumulus/tasks/sync-granule/package.json @@ -56,6 +56,8 @@ "@ava/babel-preset-stage-4": "^1.1.0", "@ava/babel-preset-transform-test-files": "^3.0.0", "ava": "^0.21.0", + "fs-extra": "^5.0.0", + "lodash": "^4.17.5", "proxyquire": "^1.8.0", "sinon": "^2.0.0-pre.5" } diff --git a/cumulus/tasks/sync-granule/schemas/config.json b/cumulus/tasks/sync-granule/schemas/config.json index be01095a486..17c150a7fc1 100644 --- a/cumulus/tasks/sync-granule/schemas/config.json +++ b/cumulus/tasks/sync-granule/schemas/config.json @@ -36,10 +36,8 @@ }, "collection": { "type": "object", - "description": "", "required": [ "process", - "url_path", "files", "name", "granuleIdExtraction", diff --git a/cumulus/tasks/sync-granule/tests/sync_granule_test.js b/cumulus/tasks/sync-granule/tests/sync_granule_test.js index f928389900b..ffc7184273a 100644 --- a/cumulus/tasks/sync-granule/tests/sync_granule_test.js +++ b/cumulus/tasks/sync-granule/tests/sync_granule_test.js @@ -1,28 +1,25 @@ 'use strict'; -const test = require('ava'); +const aws = require('@cumulus/common/aws'); const errors = require('@cumulus/common/errors'); +const fs = require('fs-extra'); +const path = require('path'); const payload = require('@cumulus/test-data/payloads/new-message-schema/ingest.json'); -const payloadChecksumFile = require( - '@cumulus/test-data/payloads/new-message-schema/ingest-checksumfile.json' -); -const testUtils = require('@cumulus/common/test-utils'); -const aws = require('@cumulus/common/aws'); - -const { syncGranule } = require('../index'); +const test = require('ava'); -test('error when provider info is missing', (t) => { - const newPayload = Object.assign({}, payload); - delete newPayload.config.provider; +const payloadChecksumFile = require('@cumulus/test-data/payloads/new-message-schema/ingest-checksumfile.json'); // eslint-disable-line max-len - return syncGranule(newPayload) - .then(t.fail) - .catch((e) => { - t.true(e instanceof errors.ProviderNotFound); - }); -}); +const { + findGitRepoRootDirectory, + randomString, + validateConfig, + validateInput, + validateOutput +} = require('@cumulus/common/test-utils'); +const { cloneDeep } = require('lodash'); +const { syncGranule } = require('../index'); -test('download Granule from FTP endpoint', (t) => { +test('download Granule from FTP endpoint', async (t) => { const provider = { id: 'MODAPS', protocol: 'ftp', @@ -31,18 +28,22 @@ test('download Granule from FTP endpoint', (t) => { password: 'testpass' }; - const newPayload = Object.assign({}, payload); + const newPayload = cloneDeep(payload); newPayload.config.provider = provider; - const protectedBucketName = testUtils.randomString(); - const internalBucketName = testUtils.randomString(); + const protectedBucketName = randomString(); + const internalBucketName = randomString(); newPayload.config.buckets.protected = protectedBucketName; newPayload.config.buckets.internal = internalBucketName; + await validateInput(t, newPayload.input); + await validateConfig(t, newPayload.config); + return aws.s3().createBucket({ Bucket: protectedBucketName }).promise() .then(() => aws.s3().createBucket({ Bucket: internalBucketName }).promise()) .then(() => syncGranule(newPayload)) + .then((output) => validateOutput(t, output).then(() => output)) .then((output) => { t.is(output.granules.length, 1); t.is(output.granules[0].files.length, 1); @@ -61,87 +62,126 @@ test('download Granule from FTP endpoint', (t) => { }); }); -test('download Granule from HTTP endpoint', (t) => { - const provider = { +test('download Granule from HTTP endpoint', async (t) => { + const granuleUrlPath = randomString(); + + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const tmpTestDataDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', granuleUrlPath); + + const granuleFilename = 'MOD09GQ.A2017224.h27v08.006.2017227165029.hdf'; + + const event = cloneDeep(payload); + event.config.buckets.internal = randomString(); + event.config.buckets.protected = randomString(); + event.config.provider = { id: 'MODAPS', protocol: 'http', host: 'http://localhost:8080' }; + event.input.granules[0].files[0].path = `/${granuleUrlPath}`; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + await fs.ensureDir(tmpTestDataDirectory); + try { + await Promise.all([ + fs.copy( + path.join(gitRepoRootDirectory, 'packages', 'test-data', 'granules', granuleFilename), + path.join(tmpTestDataDirectory, granuleFilename)), + aws.s3().createBucket({ Bucket: event.config.buckets.internal }).promise(), + aws.s3().createBucket({ Bucket: event.config.buckets.protected }).promise() + ]); + + const output = await syncGranule(event); + + await validateOutput(t, output); + t.is(output.granules.length, 1); + t.is(output.granules[0].files.length, 1); + t.is( + output.granules[0].files[0].filename, + `s3://${event.config.buckets.protected}/${granuleFilename}` + ); + } + catch (e) { + if (e instanceof errors.RemoteResourceError) { + t.pass('ignoring this test. Test server seems to be down'); + } + else t.fail(e); + } + finally { + await Promise.all([ + fs.remove(tmpTestDataDirectory), + aws.recursivelyDeleteS3Bucket(event.config.buckets.internal), + aws.recursivelyDeleteS3Bucket(event.config.buckets.protected) + ]); + } +}); - const newPayload = Object.assign({}, payload); - newPayload.config.provider = provider; - - const protectedBucketName = testUtils.randomString(); - const internalBucketName = testUtils.randomString(); +test('download granule over HTTP with checksum in file', async (t) => { + const granuleUrlPath = randomString(); - newPayload.config.buckets.protected = protectedBucketName; - newPayload.config.buckets.internal = internalBucketName; + // Figure out the directory paths that we're working with + const gitRepoRootDirectory = await findGitRepoRootDirectory(); + const tmpTestDataDirectory = path.join(gitRepoRootDirectory, 'tmp-test-data', granuleUrlPath); - return aws.s3().createBucket({ Bucket: protectedBucketName }).promise() - .then(() => aws.s3().createBucket({ Bucket: internalBucketName }).promise()) - .then(() => syncGranule(newPayload)) - .then((output) => { - t.is(output.granules.length, 1); - t.is(output.granules[0].files.length, 1); - t.is( - output.granules[0].files[0].filename, - `s3://${protectedBucketName}/MOD09GQ.A2017224.h27v08.006.2017227165029.hdf` - ); + const granuleFilename = '20160115-MODIS_T-JPL-L2P-T2016015000000.L2_LAC_GHRSST_N-v01.nc.bz2'; + const checksumFilename = '20160115-MODIS_T-JPL-L2P-T2016015000000.L2_LAC_GHRSST_N-v01.nc.bz2.md5'; - return aws.recursivelyDeleteS3Bucket(internalBucketName); - }) - .catch((e) => { - if (e instanceof errors.RemoteResourceError) { - t.pass('ignoring this test. Test server seems to be down'); - } - else throw e; - }); -}); - -test('download Granule with checksum in file', (t) => { - const provider = { + const event = cloneDeep(payloadChecksumFile); + event.config.buckets.internal = randomString(); + event.config.buckets.private = randomString(); + event.config.buckets.protected = randomString(); + event.config.provider = { id: 'MODAPS', protocol: 'http', host: 'http://localhost:8080' }; - - const newPayload = Object.assign({}, payloadChecksumFile); - newPayload.config.provider = provider; - - const internalBucketName = testUtils.randomString(); - const privateBucketName = testUtils.randomString(); - const protectedBucketName = testUtils.randomString(); - - newPayload.config.buckets.internal = internalBucketName; - newPayload.config.buckets.private = privateBucketName; - newPayload.config.buckets.protected = protectedBucketName; - - return aws.s3().createBucket({ Bucket: protectedBucketName }).promise() - .then(() => aws.s3().createBucket({ Bucket: internalBucketName }).promise()) - .then(() => aws.s3().createBucket({ Bucket: privateBucketName }).promise()) - .then(() => syncGranule(newPayload)) - .then((output) => { - t.is(output.granules.length, 1); - t.is(output.granules[0].files.length, 1); - t.is( - output.granules[0].files[0].filename, - `s3://${privateBucketName}/20160115-MODIS_T-JPL-L2P-T2016015000000.L2_LAC_GHRSST_N-v01.nc.bz2` // eslint-disable-line max-len - ); - - return aws.recursivelyDeleteS3Bucket(protectedBucketName) - .then(() => aws.recursivelyDeleteS3Bucket(internalBucketName)) - .then(() => aws.recursivelyDeleteS3Bucket(privateBucketName)); - }) - .catch((e) => - aws.recursivelyDeleteS3Bucket(protectedBucketName) - .then(() => aws.recursivelyDeleteS3Bucket(internalBucketName)) - .then(() => aws.recursivelyDeleteS3Bucket(privateBucketName)) - .then(() => { - if (e instanceof errors.RemoteResourceError) { - t.pass('ignoring this test. Test server seems to be down'); - } - else throw e; - })); + event.input.granules[0].files[0].path = `/${granuleUrlPath}`; + event.input.granules[0].files[1].path = `/${granuleUrlPath}`; + + await validateConfig(t, event.config); + await validateInput(t, event.input); + + await fs.ensureDir(tmpTestDataDirectory); + try { + await Promise.all([ + fs.copy( + path.join(gitRepoRootDirectory, 'packages', 'test-data', 'granules', granuleFilename), + path.join(tmpTestDataDirectory, granuleFilename)), + fs.copy( + path.join(gitRepoRootDirectory, 'packages', 'test-data', 'granules', checksumFilename), + path.join(tmpTestDataDirectory, checksumFilename)), + aws.s3().createBucket({ Bucket: event.config.buckets.internal }).promise(), + aws.s3().createBucket({ Bucket: event.config.buckets.private }).promise(), + aws.s3().createBucket({ Bucket: event.config.buckets.protected }).promise() + ]); + + const output = await syncGranule(event); + + await validateOutput(t, output); + t.is(output.granules.length, 1); + t.is(output.granules[0].files.length, 1); + t.is( + output.granules[0].files[0].filename, + `s3://${event.config.buckets.private}/${granuleFilename}` // eslint-disable-line max-len + ); + } + catch (e) { + if (e instanceof errors.RemoteResourceError) { + t.pass('ignoring this test. Test server seems to be down'); + } + else t.fail(e); + } + finally { + await Promise.all([ + fs.remove(tmpTestDataDirectory), + aws.recursivelyDeleteS3Bucket(event.config.buckets.internal), + aws.recursivelyDeleteS3Bucket(event.config.buckets.private), + aws.recursivelyDeleteS3Bucket(event.config.buckets.protected) + ]); + } }); // // TODO Fix this test as part of https://bugs.earthdata.nasa.gov/browse/CUMULUS-272 diff --git a/docker-compose.yml b/docker-compose.yml index 10acfac2883..12b783eb5ec 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,13 +21,12 @@ services: ports: - "8080:80" volumes: - - "./packages/test-data:/var/www/html" + - "./tmp-test-data:/var/www/html" sftp: image: atmoz/sftp ports: - "2222:22" volumes: - - "./packages/test-data:/home/user/test-data:ro" + - "./tmp-test-data:/home/user/test-data:ro" command: user:password - diff --git a/packages/common/package.json b/packages/common/package.json index 9d446be9635..43983960f63 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -61,6 +61,7 @@ "expect.js": "^0.3.1", "exports-loader": "^0.6.3", "follow-redirects": "^1.2.4", + "fs-extra": "^5.0.0", "glob": "^7.1.1", "js-yaml": "^3.8.3", "lodash": "^4.13.1", @@ -76,6 +77,7 @@ "webpack-node-externals": "^1.5.4" }, "devDependencies": { + "ajv": "^5.5.2", "ava": "^0.25.0" } } diff --git a/packages/common/test-utils.js b/packages/common/test-utils.js index d83a56e3a0b..48b2dc11c7d 100644 --- a/packages/common/test-utils.js +++ b/packages/common/test-utils.js @@ -1,8 +1,13 @@ +/* eslint-disable no-console */ 'use strict'; +const Ajv = require('ajv'); const crypto = require('crypto'); +const path = require('path'); const url = require('url'); const aws = require('./aws'); +const { readFile } = require('fs'); +const fs = require('fs-extra'); /** * Generate a 40-character random string @@ -107,3 +112,110 @@ async function createQueue() { return url.format(returnedQueueUrl); } exports.createQueue = createQueue; + +/** + * Read a file and return a promise with the data + * + * Takes the same parameters as fs.readFile: + * + * https://nodejs.org/docs/v6.10.3/api/fs.html#fs_fs_readfile_file_options_callback + * + * @param {string|Buffer|integer} file - filename or file descriptor + * @param {any} options - encoding and flag options + * @returns {Promise} - the contents of the file + */ +function promisedReadFile(file, options) { + return new Promise((resolve, reject) => { + readFile(file, options, (err, data) => { + if (err) reject(err); + else resolve(data); + }); + }); +} + +/** + * Validate an object using json-schema + * + * Issues a test failure if there were validation errors + * + * @param {Object} t - an ava test + * @param {string} schemaFilename - the filename of the schema + * @param {Object} data - the object to be validated + * @returns {boolean} - whether the object is valid or not + */ +async function validateJSON(t, schemaFilename, data) { + const schemaName = path.basename(schemaFilename).split('.')[0]; + const schema = await promisedReadFile(schemaFilename, 'utf8').then(JSON.parse); + const ajv = new Ajv(); + const valid = ajv.validate(schema, data); + if (!valid) { + const message = `${schemaName} validation failed: ${ajv.errorsText()}`; + console.log(message); + console.log(JSON.stringify(data, null, 2)); + return t.fail(message); + } + return valid; +} + +/** + * Validate a task input object using json-schema + * + * Issues a test failure if there were validation errors + * + * @param {Object} t - an ava test + * @param {Object} data - the object to be validated + * @returns {boolean} - whether the object is valid or not + */ +async function validateInput(t, data) { + return validateJSON(t, './schemas/input.json', data); +} +exports.validateInput = validateInput; + +/** + * Validate a task config object using json-schema + * + * Issues a test failure if there were validation errors + * + * @param {Object} t - an ava test + * @param {Object} data - the object to be validated + * @returns {boolean} - whether the object is valid or not + */ +async function validateConfig(t, data) { + return validateJSON(t, './schemas/config.json', data); +} +exports.validateConfig = validateConfig; + +/** + * Validate a task output object using json-schema + * + * Issues a test failure if there were validation errors + * + * @param {Object} t - an ava test + * @param {Object} data - the object to be validated + * @returns {boolean} - whether the object is valid or not + */ +async function validateOutput(t, data) { + return validateJSON(t, './schemas/output.json', data); +} +exports.validateOutput = validateOutput; + +/** + * Determine the path of the current git repo + * + * @param {string} dirname - the directory to start searching from. Defaults to + * `process.cwd()` + * @returns {string} - the filesystem path of the current git repo + */ +async function findGitRepoRootDirectory(dirname) { + if (dirname === undefined) return findGitRepoRootDirectory(path.dirname(process.cwd())); + + if (await fs.pathExists(path.join(dirname, '.git'))) return dirname; + + // This indicates that we've reached the root of the filesystem + if (path.dirname(dirname) === dirname) { + throw new Error('Unable to determine git repo root directory'); + } + + return findGitRepoRootDirectory(path.dirname(dirname)); +} +exports.findGitRepoRootDirectory = findGitRepoRootDirectory; diff --git a/packages/ingest/ftp.js b/packages/ingest/ftp.js index 16d83704328..673110b7254 100644 --- a/packages/ingest/ftp.js +++ b/packages/ingest/ftp.js @@ -1,15 +1,13 @@ 'use strict'; -const os = require('os'); -const fs = require('fs'); const JSFtp = require('jsftp'); const join = require('path').join; const log = require('@cumulus/common/log'); -const aws = require('@cumulus/common/aws'); const Crypto = require('./crypto').DefaultProvider; const recursion = require('./recursion'); +const { omit } = require('lodash'); -module.exports.ftpMixin = superclass => class extends superclass { +module.exports.ftpMixin = (superclass) => class extends superclass { constructor(...args) { super(...args); @@ -39,36 +37,31 @@ module.exports.ftpMixin = superclass => class extends superclass { } } - /** - * Downloads a given url and upload to a given S3 location - * @return {Promise} - * @private + /** + * Download a remote file to disk + * + * @param {string} remotePath - the full path to the remote file to be fetched + * @param {string} localPath - the full local destination file path + * @returns {Promise.} - the path that the file was saved to */ + async download(remotePath, localPath) { + const remoteUrl = `ftp://${this.host}${remotePath}`; + log.info(`Downloading ${remoteUrl} to ${localPath}`); - async sync(path, bucket, key, filename) { - const tempFile = await this.download(path, filename); - return this.upload(bucket, key, filename, tempFile); - } - - /** - * Downloads the file to disk, difference with sync is that - * this method involves no uploading to S3 - * @return {Promise} - * @private - */ - async download(path, filename) { if (!this.decrypted) await this.decrypt(); - // let's stream to file - const tempFile = join(os.tmpdir(), filename); const client = new JSFtp(this.options); return new Promise((resolve, reject) => { client.on('error', reject); - client.get(join(path, filename), tempFile, (err) => { + client.get(remotePath, localPath, (err) => { client.destroy(); - if (err) return reject(err); - return resolve(tempFile); + + if (err) reject(err); + else { + log.info(`Finishing downloading ${remoteUrl}`); + resolve(localPath); + } }); }); } @@ -115,14 +108,12 @@ module.exports.ftpMixin = superclass => class extends superclass { return reject(err); } - return resolve(data.map(d => ({ + return resolve(data.map((d) => ({ name: d.name, - type: d.type, - size: d.size, - time: d.date, - owner: d.owner, - group: d.group, - path: path + path: path, + size: parseInt(d.size, 10), + time: d.time, + type: d.type }))); }); }); @@ -139,7 +130,11 @@ module.exports.ftpMixin = superclass => class extends superclass { const listFn = this._list.bind(this); const files = await recursion(listFn, this.path); + log.info(`${files.length} files were found on ${this.host}`); - return files; + + // Type 'type' field is required to support recursive file listing, but + // should not be part of the returned result. + return files.map((file) => omit(file, 'type')); } }; diff --git a/packages/ingest/granule.js b/packages/ingest/granule.js index 8cf2072c441..73234e92487 100644 --- a/packages/ingest/granule.js +++ b/packages/ingest/granule.js @@ -1,16 +1,14 @@ -/* eslint-disable no-param-reassign */ 'use strict'; const aws = require('@cumulus/common/aws'); -const fs = require('fs'); +const fs = require('fs-extra'); const get = require('lodash.get'); -const join = require('path').join; +const os = require('os'); +const path = require('path'); const urljoin = require('url-join'); const cksum = require('cksum'); const checksum = require('checksum'); -const log = require('@cumulus/common/log'); const errors = require('@cumulus/common/errors'); -const queue = require('./queue'); const sftpMixin = require('./sftp'); const ftpMixin = require('./ftp').ftpMixin; const httpMixin = require('./http').httpMixin; @@ -69,7 +67,7 @@ class Discover { file.url_path = f.url_path; } else { - file.url_path = this.collection.url_path || '/'; + file.url_path = this.collection.url_path || ''; } } } @@ -81,14 +79,10 @@ class Discover { async discover() { // get list of files that matches a given path - const files = await this.list(); + const updatedFiles = (await this.list()) + .map((file) => this.setGranuleInfo(file)) + .filter((file) => file); - const updatedFiles = []; - // select files that match a given collection - files.forEach(f => { - const file = this.setGranuleInfo(f); - if (file) updatedFiles.push(file); - }); return await this.findNewGranules(updatedFiles); } @@ -135,43 +129,6 @@ class Discover { } } -/** - * This is a base class for discovering PDRs - * It must be mixed with a FTP or HTTP mixing to work - * - * @class - * @abstract - */ -class DiscoverAndQueue extends Discover { - /** - * Creates an instance of DiscoverAndQueue - * - * @param {Object} event - a Cumulus event - * @memberof DiscoverAndQueue - */ - constructor(event) { - super(event); - - this.queueUrl = event.config.queueUrl; - this.templateUri = event.config.templateUri; - } - - async findNewGranules(files) { - const granules = await super.findNewGranules(files); - return Promise.all(granules.map(g => queue.queueGranule( - g, - this.queueUrl, - this.templateUri, - this.provider, - this.collection, - null, - this.stack, - this.buckets.internal - ))); - } -} - - /** * This is a base class for ingesting and parsing a single PDR * It must be mixed with a FTP or HTTP mixing to work @@ -248,18 +205,29 @@ class Granule { return true; } - async _validateChecksum(type, value, tempFile, options) { - if (!options) options = {}; + /** + * Validate a file's checksum and throw an exception if it's invalid + * + * @param {Object} file - the file object to be checked + * @param {string} fileLocalPath - the path to the file on the filesystem + * @param {Object} [options={}] - options for the this._hash method + * @returns {undefined} - no return value, but throws an error if the + * checksum is invalid + * @memberof Granule + */ + async validateChecksum(file, fileLocalPath, options = {}) { + const [type, value] = await this.getChecksumFromFile(file); + + if (!type || !value) return; + let sum = null; + if (type.toLowerCase() === 'cksum') sum = await this._cksum(fileLocalPath); + else sum = await this._hash(type, fileLocalPath, options); - if (type.toLowerCase() === 'cksum') { - sum = await this._cksum(tempFile); - } - else { - sum = await this._hash(type, tempFile, options); + if (value !== sum) { + const message = `Invalid checksum for ${file.name} with type ${file.checksumType} and value ${file.checksumValue}`; // eslint-disable-line max-len + throw new errors.InvalidChecksum(message); } - - return value === sum; } async _cksum(tempFile) { @@ -281,109 +249,92 @@ class Granule { ); } - /** - * Ingest individual files - * @private - */ - async ingestFile(_file, duplicateHandling) { - const file = _file; - let exists = null; + async enableDuplicateHandling(bucket) { + // check that the bucket has versioning enabled + const versioning = await aws.s3().getBucketVersioning({ Bucket: bucket }).promise(); - // check if the file exists. - exists = await aws.s3ObjectExists({ Bucket: file.bucket, Key: join(file.url_path, file.name) }); - - if (duplicateHandling === 'version') { - // check that the bucket has versioning enabled - let versioning = await aws.s3().getBucketVersioning({ Bucket: file.bucket }).promise(); + // if not enabled, make it enabled + if (versioning.Status !== 'Enabled') { + return aws.s3().putBucketVersioning({ + Bucket: bucket, + VersioningConfiguration: { Status: 'Enabled' } }).promise(); + } + } - // if not enabled, make it enabled - if (versioning.Status !== 'Enabled') { - versioning = await aws.s3().putBucketVersioning({ - Bucket: file.bucket, - VersioningConfiguration: { Status: 'Enabled' } }).promise(); - } + async getChecksumFromFile(file) { + if (file.checksumType && file.checksumValue) { + return [file.checksumType, file.checksumValue]; } + else if (this.checksumFiles[file.name]) { + const checksumInfo = this.checksumFiles[file.name]; - if (!exists || duplicateHandling !== 'skip') { - // Either the file does not exist yet, or it does but - // we are replacing it with a more recent one or - // adding another version of it to the bucket + const checksumRemotePath = path.join(checksumInfo.path, checksumInfo.name); - // we considered a direct stream from source to S3 but since - // it doesn't work with FTP connections, we decided to always download - // and then upload - let tempFile; + const downloadDir = await fs.mkdtemp(`${os.tmpdir()}${path.sep}`); + const checksumLocalPath = path.join(downloadDir, checksumInfo.name); + + let checksumValue; try { - log.info(`downloading ${file.name}`); - tempFile = await this.download(file.path, file.name); - log.info(`downloaded ${file.name}`); + await this.download(checksumRemotePath, checksumLocalPath); + checksumValue = (await fs.readFile(checksumLocalPath, 'utf8')).split(' ')[0]; } - catch (e) { - if (e.message && e.message.includes('Unexpected HTTP status code: 403')) { - throw new errors.FileNotFound( - `${file.name} was not found on the server with 403 status` - ); - } - throw e; + finally { + await fs.remove(downloadDir); } - try { - let checksumType = null; - let checksumValue = null; + // assuming the type is md5 + return ['md5', checksumValue]; + } - if (file.checksumType && file.checksumValue) { - checksumType = file.checksumType; - checksumValue = file.checksumValue; - } - else if (this.checksumFiles[file.name]) { - const checksumInfo = this.checksumFiles[file.name]; + // No checksum found + return [null, null]; + } - log.info(`downloading ${checksumInfo.name}`); - const checksumFilepath = await this.download(checksumInfo.path, checksumInfo.name); - log.info(`downloaded ${checksumInfo.name}`); + /** + * Ingest individual files + * @private + */ + async ingestFile(file, duplicateHandling) { + // Check if the file exists + const exists = await aws.s3ObjectExists({ + Bucket: file.bucket, + Key: path.join(file.url_path, file.name) + }); - // expecting the type is md5 - checksumType = 'md5'; - checksumValue = fs.readFileSync(checksumFilepath, 'utf8').split(' ')[0]; - fs.unlinkSync(checksumFilepath); - } - else { - // If there is not a checksum, no need to validate - file.filename = await this.upload(file.bucket, file.url_path, file.name, tempFile); - return file; - } + // Exit early if we can + if (exists && duplicateHandling === 'skip') return file; - const validated = await this._validateChecksum( - checksumType, - checksumValue, - tempFile - ); + // Enable duplicate handling + if (duplicateHandling === 'version') this.enableDuplicateHandling(file.bucket); - if (validated) { - await this.upload(file.bucket, file.url_path, file.name, tempFile); - } - else { - throw new errors.InvalidChecksum( - `Invalid checksum for ${file.name} with ` + - `type ${file.checksumType} and value ${file.checksumValue}` - ); - } - } - catch (e) { - throw new errors.InvalidChecksum( - `Error evaluating checksum for ${file.name} with ` + - `type ${file.checksumType} and value ${file.checksumValue}` - ); - } + // Either the file does not exist yet, or it does but + // we are replacing it with a more recent one or + // adding another version of it to the bucket - // delete temp file - fs.stat(tempFile, (err, stat) => { - if (stat) fs.unlinkSync(tempFile); - }); - } + // we considered a direct stream from source to S3 but since + // it doesn't work with FTP connections, we decided to always download + // and then upload - file.filename = `s3://${file.bucket}/${join(file.url_path, file.name)}`; - return file; + const downloadDir = await this.createDownloadDirectory(); + try { + const fileLocalPath = path.join(downloadDir, file.name); + const fileRemotePath = path.join(file.path, file.name); + + // Download the file + await this.download(fileRemotePath, fileLocalPath); + + // Validate the checksum + await this.validateChecksum(file, fileLocalPath); + + // Upload the file + const filename = await this.upload(file.bucket, file.url_path, file.name, fileLocalPath); + + return Object.assign({}, file, { filename }); + } + finally { + // Delete the temp directory + await fs.remove(downloadDir); + } } } @@ -392,31 +343,16 @@ class Granule { */ class HttpDiscoverGranules extends httpMixin(baseProtocol(Discover)) {} -/** - * A class for discovering granules using HTTP or HTTPS and queueing them to SQS. - */ -class HttpDiscoverAndQueueGranules extends httpMixin(baseProtocol(DiscoverAndQueue)) {} - /** * A class for discovering granules using SFTP. */ class SftpDiscoverGranules extends sftpMixin(baseProtocol(Discover)) {} -/** - * A class for discovering granules using SFTP and queueing them to SQS. - */ -class SftpDiscoverAndQueueGranules extends sftpMixin(baseProtocol(DiscoverAndQueue)) {} - /** * A class for discovering granules using FTP. */ class FtpDiscoverGranules extends ftpMixin(baseProtocol(Discover)) {} -/** - * A class for discovering granules using FTP and queueing them to SQS. - */ -class FtpDiscoverAndQueueGranules extends ftpMixin(baseProtocol(DiscoverAndQueue)) {} - /** * Ingest Granule from an FTP endpoint. */ @@ -437,19 +373,18 @@ class HttpGranule extends httpMixin(baseProtocol(Granule)) {} * * @param {string} type -`discover` or `ingest` * @param {string} protocol -`sftp`, `ftp`, or `http` -* @param {boolean} q - set to `true` to queue granules * @returns {function} - a constructor to create a granule discovery object **/ -function selector(type, protocol, q) { +function selector(type, protocol) { if (type === 'discover') { switch (protocol) { case 'sftp': - return q ? SftpDiscoverAndQueueGranules : SftpDiscoverGranules; + return SftpDiscoverGranules; case 'ftp': - return q ? FtpDiscoverAndQueueGranules : FtpDiscoverGranules; + return FtpDiscoverGranules; case 'http': case 'https': - return q ? HttpDiscoverAndQueueGranules : HttpDiscoverGranules; + return HttpDiscoverGranules; default: throw new Error(`Protocol ${protocol} is not supported.`); } @@ -475,8 +410,5 @@ module.exports.HttpGranule = HttpGranule; module.exports.FtpGranule = FtpGranule; module.exports.SftpGranule = SftpGranule; module.exports.SftpDiscoverGranules = SftpDiscoverGranules; -module.exports.SftpDiscoverAndQueueGranules = SftpDiscoverAndQueueGranules; module.exports.FtpDiscoverGranules = FtpDiscoverGranules; -module.exports.FtpDiscoverAndQueueGranules = FtpDiscoverAndQueueGranules; module.exports.HttpDiscoverGranules = HttpDiscoverGranules; -module.exports.HttpDiscoverAndQueueGranules = HttpDiscoverAndQueueGranules; diff --git a/packages/ingest/http.js b/packages/ingest/http.js index f9c940c07d1..496e6826681 100644 --- a/packages/ingest/http.js +++ b/packages/ingest/http.js @@ -1,15 +1,14 @@ 'use strict'; const fs = require('fs'); -const os = require('os'); const path = require('path'); const urljoin = require('url-join'); const Crawler = require('simplecrawler'); const http = require('http'); const https = require('https'); +const log = require('@cumulus/common/log'); const mkdirp = require('mkdirp'); const pump = require('pump'); -const syncUrl = require('@cumulus/common/aws').syncUrl; const errors = require('@cumulus/common/errors'); /** @@ -28,6 +27,7 @@ async function downloadToDisk(url, filepath) { err.code = res.statusCode; return reject(err); } + // FIXME The download directory will exist, so this mkdirp can be removed return mkdirp(path.dirname(filepath), (err) => { if (err) return reject(err); const file = fs.createWriteStream(filepath); @@ -65,7 +65,7 @@ module.exports.httpMixin = (superclass) => class extends superclass { for (const line of lines) { const split = line.trim().split(pattern); if (split.length === 3) { - // Some providers provide files with one number after the dot (".") ex (tmtdayacz8110_5.6) + // Some providers provide files with one number after the dot (".") ex (tmtdayacz8110_5.6) if (split[1].match(/^(.*\.[\w\d]{1,4})$/) !== null) { const name = split[1]; files.push({ @@ -96,35 +96,29 @@ module.exports.httpMixin = (superclass) => class extends superclass { }); } - /** - * Downloads a given url and upload to a given S3 location + /** + * Download a remote file to disk * - * @param {string} _path - the path to download the file from - * @param {string} bucket - the bucket to upload the file to - * @param {string} key - the file prefix on s3 - * @param {string} filename - the filename - * @returns {Promise.} a s3 uri + * @param {string} remotePath - the full path to the remote file to be fetched + * @param {string} localPath - the full local destination file path + * @returns {Promise.} - the path that the file was saved to */ - async sync(_path, bucket, key, filename) { - await syncUrl(urljoin(this.host, _path, filename), bucket, path.join(key, filename)); - return urljoin('s3://', bucket, key, filename); - } + async download(remotePath, localPath) { + const remoteUrl = urljoin(this.host, remotePath); - /** - * Downloads the file to disk, difference with sync is that - * this method involves no uploading to S3 - * - * @param {string} _path - the path to download the file from - * @param {string} filename - the filename - * @returns {Promise.} the path to the temp file - */ - async download(_path, filename) { - // let's stream to file - const tempFile = path.join(os.tmpdir(), filename); - const uri = urljoin(this.host, _path, filename); - - await downloadToDisk(uri, tempFile); + log.info(`Downloading ${remoteUrl} to ${localPath}`); + try { + await downloadToDisk(remoteUrl, localPath); + } + catch (e) { + if (e.message && e.message.includes('Unexpected HTTP status code: 403')) { + const message = `${path.basename(remotePath)} was not found on the server with 403 status`; + throw new errors.FileNotFound(message); + } + else throw e; + } + log.info(`Finishing downloading ${remoteUrl}`); - return tempFile; + return localPath; } }; diff --git a/packages/ingest/package.json b/packages/ingest/package.json index fca11e7d0f1..92051e3b019 100644 --- a/packages/ingest/package.json +++ b/packages/ingest/package.json @@ -50,9 +50,11 @@ "babel-register": "^6.26.0", "checksum": "^0.1.1", "cksum": "^1.3.0", + "fs-extra": "^5.0.0", "got": "^7.1.0", "jsftp": "^2.0.0", "json-loader": "~0.5.7", + "lodash": "^4.17.5", "lodash.get": "^4.4.2", "mkdirp": "^0.5.1", "moment": "^2.6.0", diff --git a/packages/ingest/parse-pdr.js b/packages/ingest/parse-pdr.js index 36f0de12839..07e5c0acf37 100644 --- a/packages/ingest/parse-pdr.js +++ b/packages/ingest/parse-pdr.js @@ -62,8 +62,14 @@ function parseSpec(pdrName, spec) { throw new PDRParsingError('FILE_CKSUM_VALUE', pdrName); } - const name = filename; - return { path, name, fileSize, checksumType, checksumValue }; + const parsedSpec = { + path, + fileSize, + name: filename + }; + if (checksumType) parsedSpec.checksumType = checksumType; + if (checksumValue) parsedSpec.checksumValue = checksumValue; + return parsedSpec; } module.exports.parseSpec = parseSpec; @@ -79,7 +85,8 @@ function extractGranuleId(fileName, regex) { module.exports.parsePdr = function parsePdr(pdrFilePath, collection, pdrName) { // then read the file and and pass it to parser - const pdrFile = fs.readFileSync(pdrFilePath); + const pdrFile = fs.readFileSync(pdrFilePath, 'utf8'); + const obj = { granules: [] }; @@ -87,7 +94,7 @@ module.exports.parsePdr = function parsePdr(pdrFilePath, collection, pdrName) { // because MODAPS PDRs do not follow the standard ODL spec // we have to make sure there are spaces before and after every // question mark - let pdrString = pdrFile.toString().replace(/((\w*)=(\w*))/g, '$2 = $3'); + let pdrString = pdrFile.replace(/((\w*)=(\w*))/g, '$2 = $3'); // temporary fix for PVL not recognizing quoted strings as symbols pdrString = pdrString.replace(/"/g, ''); diff --git a/packages/ingest/pdr.js b/packages/ingest/pdr.js index 4619c7c5ca4..22e00bea476 100644 --- a/packages/ingest/pdr.js +++ b/packages/ingest/pdr.js @@ -1,16 +1,15 @@ 'use strict'; -const path = require('path'); +const aws = require('@cumulus/common/aws'); +const fs = require('fs-extra'); +const ftpMixin = require('./ftp').ftpMixin; const get = require('lodash.get'); +const httpMixin = require('./http').httpMixin; const log = require('@cumulus/common/log'); -const { MismatchPdrCollection } = require('@cumulus/common/errors'); const parsePdr = require('./parse-pdr').parsePdr; -const ftpMixin = require('./ftp').ftpMixin; -const httpMixin = require('./http').httpMixin; +const path = require('path'); const sftpMixin = require('./sftp'); -const aws = require('@cumulus/common/aws'); -const { S3 } = require('./aws'); -const queue = require('./queue'); + const { baseProtocol } = require('./protocol'); /** @@ -26,10 +25,7 @@ class Discover { bucket, collection, provider, - queueUrl, - templateUri, folder = 'pdrs', - queueLimit = null ) { if (this.constructor === Discover) { throw new TypeError('Can not construct abstract class.'); @@ -40,8 +36,6 @@ class Discover { this.collection = collection; this.provider = provider; this.folder = folder; - this.queueUrl = queueUrl; - this.templateUri = templateUri; // get authentication information this.port = get(this.provider, 'port', 21); @@ -49,12 +43,6 @@ class Discover { this.path = this.collection.provider_path || '/'; this.username = get(this.provider, 'username', null); this.password = get(this.provider, 'password', null); - this.limit = queueLimit; - } - - filterPdrs(pdr) { - const test = new RegExp(/^(.*\.PDR)$/); - return pdr.name.match(test) !== null; } /** @@ -62,13 +50,10 @@ class Discover { * @return {Promise} * @public */ - async discover() { - let files = await this.list(); - - // filter out non pdr files - files = files.filter(f => this.filterPdrs(f)); - return this.findNewPdrs(files); + const files = await this.list(); + const pdrs = files.filter((file) => file.name.endsWith('.PDR')); + return this.findNewPdrs(pdrs); } /** @@ -105,28 +90,6 @@ class Discover { } } -/** - * This is a base class for discovering PDRs - * It must be mixed with a FTP or HTTP mixing to work - * - * @class - * @abstract - */ -class DiscoverAndQueue extends Discover { - async findNewPdrs(pdrs) { - let newPdrs = await super.findNewPdrs(pdrs); - if (this.limit) newPdrs = newPdrs.slice(0, this.limit); - return Promise.all(newPdrs.map((p) => queue.queuePdr( - this.queueUrl, - this.templateUri, - this.provider, - this.collection, - p - ))); - } -} - - /** * This is a base class for ingesting and parsing a single PDR * It must be mixed with a FTP or HTTP mixing to work @@ -141,8 +104,6 @@ class Parse { bucket, collection, provider, - queueUrl, - templateUri, folder = 'pdrs') { if (this.constructor === Parse) { throw new TypeError('Can not construct abstract class.'); @@ -154,8 +115,6 @@ class Parse { this.collection = collection; this.provider = provider; this.folder = folder; - this.queueUrl = queueUrl; - this.templateUri = templateUri; this.port = get(this.provider, 'port', 21); this.host = get(this.provider, 'host', null); @@ -177,23 +136,33 @@ class Parse { /** * Copy the PDR to S3 and parse it * - * @return {Promise} + * @returns {Promise} - the list of granules in the PDR * @public */ async ingest() { - // download - const pdrLocalPath = await this.download(this.pdr.path, this.pdr.name); - - // parse the PDR - const granules = await this.parse(pdrLocalPath); - - // upload only if the parse was successful - await this.upload( - this.bucket, - path.join(this.stack, this.folder), - this.pdr.name, - pdrLocalPath - ); + // download the PDR + const downloadDir = await this.createDownloadDirectory(); + const pdrLocalPath = path.join(downloadDir, this.pdr.name); + const pdrRemotePath = path.join(this.pdr.path, this.pdr.name); + await this.download(pdrRemotePath, pdrLocalPath); + + let granules; + try { + // parse the PDR + granules = await this.parse(pdrLocalPath); + + // upload only if the parse was successful + await this.upload( + this.bucket, + path.join(this.stack, this.folder), + this.pdr.name, + pdrLocalPath + ); + } + finally { + // Clean up the temporary download directory + await fs.remove(downloadDir); + } // return list of all granules found in the PDR return granules; @@ -227,79 +196,6 @@ class Parse { } } -/** - * This is a base class for discovering PDRs - * It must be mixed with a FTP or HTTP mixing to work - * - * @class - * @abstract - */ -class ParseAndQueue extends Parse { - async ingest() { - const payload = await super.ingest(); - const collections = {}; - - //payload.granules = payload.granules.slice(0, 10); - - // make sure all parsed granules have the correct collection - for (const g of payload.granules) { - if (!collections[g.dataType]) { - // if the collection is not provided in the payload - // get it from S3 - if (g.dataType !== this.collection.name) { - const bucket = this.bucket; - const key = `${this.stack}` + - `/collections/${g.dataType}.json`; - let file; - try { - file = await S3.get(bucket, key); - } - catch (e) { - throw new MismatchPdrCollection( - `${g.dataType} dataType in ${this.pdr.name} doesn't match ${this.collection.name}` - ); - } - - collections[g.dataType] = JSON.parse(file.Body.toString()); - } - else { - collections[g.dataType] = this.collection; - } - } - - g.granuleId = this.extractGranuleId( - g.files[0].name, - collections[g.dataType].granuleIdExtraction - ); - } - - log.info(`Queueing ${payload.granules.length} granules to be processed`); - - const names = await Promise.all( - payload.granules.map((g) => queue.queueGranule( - g, - this.queueUrl, - this.templateUri, - this.provider, - collections[g.dataType], - this.pdr, - this.stack, - this.bucket - )) - ); - - let isFinished = false; - const running = names.filter((n) => n[0] === 'running').map((n) => n[1]); - const completed = names.filter((n) => n[0] === 'completed').map((n) => n[1]); - const failed = names.filter((n) => n[0] === 'failed').map((n) => n[1]); - if (running.length === 0) { - isFinished = true; - } - - return { running, completed, failed, isFinished }; - } -} - /** * Discover PDRs from a FTP endpoint. * @@ -324,30 +220,6 @@ class HttpDiscover extends httpMixin(baseProtocol(Discover)) {} class SftpDiscover extends sftpMixin(baseProtocol(Discover)) {} -/** - * Discover and Queue PDRs from a FTP endpoint. - * - * @class - */ - -class FtpDiscoverAndQueue extends ftpMixin(baseProtocol(DiscoverAndQueue)) {} - -/** - * Discover and Queue PDRs from a HTTP endpoint. - * - * @class - */ - -class HttpDiscoverAndQueue extends httpMixin(baseProtocol(DiscoverAndQueue)) {} - -/** - * Discover and Queue PDRs from a SFTP endpoint. - * - * @class - */ - -class SftpDiscoverAndQueue extends sftpMixin(baseProtocol(DiscoverAndQueue)) {} - /** * Parse PDRs downloaded from a FTP endpoint. * @@ -372,47 +244,22 @@ class HttpParse extends httpMixin(baseProtocol(Parse)) {} class SftpParse extends sftpMixin(baseProtocol(Parse)) {} -/** - * Parse and Queue PDRs downloaded from a FTP endpoint. - * - * @class - */ - -class FtpParseAndQueue extends ftpMixin(baseProtocol(ParseAndQueue)) {} - -/** - * Parse and Queue PDRs downloaded from a HTTP endpoint. - * - * @class - */ - -class HttpParseAndQueue extends httpMixin(baseProtocol(ParseAndQueue)) {} - -/** - * Parse and Queue PDRs downloaded from a SFTP endpoint. - * - * @classc - */ - -class SftpParseAndQueue extends sftpMixin(baseProtocol(ParseAndQueue)) {} - /** * Select a class for discovering PDRs based on protocol * * @param {string} type –- `discover` or `parse` * @param {string} protocol –- `sftp`, `ftp`, or `http` - * @param {boolean} q - set to `true` to queue pdrs * @returns {function} - a constructor to create a PDR discovery object */ -function selector(type, protocol, q) { +function selector(type, protocol) { if (type === 'discover') { switch (protocol) { case 'http': - return q ? HttpDiscoverAndQueue : HttpDiscover; + return HttpDiscover; case 'ftp': - return q ? FtpDiscoverAndQueue : FtpDiscover; + return FtpDiscover; case 'sftp': - return q ? SftpDiscoverAndQueue : SftpDiscover; + return SftpDiscover; default: throw new Error(`Protocol ${protocol} is not supported.`); } @@ -420,11 +267,11 @@ function selector(type, protocol, q) { else if (type === 'parse') { switch (protocol) { case 'http': - return q ? HttpParseAndQueue : HttpParse; + return HttpParse; case 'ftp': - return q ? FtpParseAndQueue : FtpParse; + return FtpParse; case 'sftp': - return q ? SftpParseAndQueue : SftpParseAndQueue; + return SftpParse; default: throw new Error(`Protocol ${protocol} is not supported.`); } @@ -440,6 +287,3 @@ module.exports.SftpParse = SftpParse; module.exports.FtpDiscover = FtpDiscover; module.exports.HttpDiscover = HttpDiscover; module.exports.SftpDiscover = SftpDiscover; -module.exports.FtpDiscoverAndQueue = FtpDiscoverAndQueue; -module.exports.HttpDiscoverAndQueue = HttpDiscoverAndQueue; -module.exports.SftpDiscoverAndQueue = SftpDiscoverAndQueue; diff --git a/packages/ingest/protocol.js b/packages/ingest/protocol.js index 3d38a496af6..97a2bd951cd 100644 --- a/packages/ingest/protocol.js +++ b/packages/ingest/protocol.js @@ -1,9 +1,10 @@ 'use strict'; -const path = require('path'); const aws = require('@cumulus/common/aws'); -const fs = require('fs'); +const fs = require('fs-extra'); const log = require('@cumulus/common/log'); +const os = require('os'); +const path = require('path'); /** * the base class mixin used by all the protocol sub-classes @@ -13,7 +14,17 @@ const log = require('@cumulus/common/log'); * by other mixins. It also provides a unified upload method * to S3 */ -module.exports.baseProtocol = superclass => class extends superclass { +module.exports.baseProtocol = (superclass) => class extends superclass { + + /** + * Create a temporary directory + * + * @returns {string} - a temporary directory name + */ + createDownloadDirectory() { + const prefix = `${os.tmpdir()}${path.sep}`; + return fs.mkdtemp(prefix); + } /** * List files of a given path @@ -64,12 +75,13 @@ module.exports.baseProtocol = superclass => class extends superclass { } /** - * Download the file to disk, difference with sync is that - * this method involves no uploading to S3 + * Download a remote file to disk * - * @returns {*} undefined + * @param {string} remotePath - the full path to the remote file to be fetched + * @param {string} localPath - the full local destination file path + * @returns {Promise.} - the path that the file was saved to */ - download() { + download(remotePath, localPath) { // eslint-disable-line no-unused-vars throw new TypeError('method not implemented'); } diff --git a/packages/ingest/sftp.js b/packages/ingest/sftp.js index 4db448d415e..40e95e5e00c 100644 --- a/packages/ingest/sftp.js +++ b/packages/ingest/sftp.js @@ -1,19 +1,14 @@ 'use strict'; -const fs = require('fs'); -const os = require('os'); const Client = require('ssh2').Client; const join = require('path').join; -const urljoin = require('url-join'); const log = require('@cumulus/common/log'); -//const errors = require('@cumulus/common/errors'); -const S3 = require('./aws').S3; const Crypto = require('./crypto').DefaultProvider; const recursion = require('./recursion'); -//const PathIsInvalid = errors.createErrorType('PathIsInvalid'); +const { omit } = require('lodash'); -module.exports = superclass => class extends superclass { +module.exports = (superclass) => class extends superclass { constructor(...args) { super(...args); @@ -63,39 +58,27 @@ module.exports = superclass => class extends superclass { return this.client.end(); } - /** - * Downloads a given url and upload to a given S3 location - * @return {Promise} - * @private - */ - - async sync(path, bucket, key, filename) { - const tempFile = await this.download(path, filename); - return this.upload(bucket, key, filename, tempFile); - } - - /** - * Downloads the file to disk, difference with sync is that - * this method involves no uploading to S3 - * @return {Promise} - * @private + /** + * Download a remote file to disk + * + * @param {string} remotePath - the full path to the remote file to be fetched + * @param {string} localPath - the full local destination file path + * @returns {Promise.} - the path that the file was saved to */ - - async download(path, filename) { - // let's stream to file + async download(remotePath, localPath) { if (!this.connected) await this.connect(); - const tempFile = join(os.tmpdir(), filename); - const remoteFile = join(path, filename); - log.info({ filename }, `Downloading to ${tempFile}`); + const remoteUrl = `sftp://${this.host}${remotePath}`; + log.info(`Downloading ${remoteUrl} to ${localPath}`); return new Promise((resolve, reject) => { - this.sftp.fastGet(remoteFile, tempFile, (e) => { + this.sftp.fastGet(remotePath, localPath, (e) => { if (e) return reject(e); - log.info({ filename }, `Finishing downloading ${this.filename}`); - return (resolve(tempFile)); + + log.info(`Finishing downloading ${remoteUrl}`); + return resolve(localPath); }); - this.client.on('error', (e) => reject(e)); + this.client.on('error', reject); }); } @@ -130,14 +113,12 @@ module.exports = superclass => class extends superclass { } return reject(err); } - return resolve(list.map(i => ({ + return resolve(list.map((i) => ({ name: i.filename, + path: path, type: i.longname.substr(0, 1), size: i.attrs.size, - time: i.attrs.mtime, - owner: i.attrs.uid, - group: i.attrs.gid, - path: path + time: i.attrs.mtime * 1000 }))); }); }); @@ -153,6 +134,9 @@ module.exports = superclass => class extends superclass { const listFn = this._list.bind(this); const files = await recursion(listFn, this.path); log.info({ host: this.host }, `${files.length} files were found on ${this.host}`); - return files; + + // Type 'type' field is required to support recursive file listing, but + // should not be part of the returned result. + return files.map((file) => omit(file, 'type')); } }; diff --git a/packages/ingest/test/granule.js b/packages/ingest/test/granule.js index 9d4723cc4f2..dee278f80a9 100644 --- a/packages/ingest/test/granule.js +++ b/packages/ingest/test/granule.js @@ -11,10 +11,7 @@ const { HttpGranule, FtpDiscoverGranules, HttpDiscoverGranules, - SftpDiscoverGranules, - FtpDiscoverAndQueueGranules, - HttpDiscoverAndQueueGranules, - SftpDiscoverAndQueueGranules + SftpDiscoverGranules } = require('../granule'); /** @@ -26,10 +23,6 @@ const selectorDiscoverTypes = [ { cls: HttpDiscoverGranules, type: 'discover', protocol: 'http' }, { cls: HttpDiscoverGranules, type: 'discover', protocol: 'https' }, { cls: SftpDiscoverGranules, type: 'discover', protocol: 'sftp' }, - { cls: FtpDiscoverAndQueueGranules, type: 'discover', protocol: 'ftp', queue: true }, - { cls: HttpDiscoverAndQueueGranules, type: 'discover', protocol: 'http', queue: true }, - { cls: HttpDiscoverAndQueueGranules, type: 'discover', protocol: 'https', queue: true }, - { cls: SftpDiscoverAndQueueGranules, type: 'discover', protocol: 'sftp', queue: true } ]; const selectorSyncTypes = [ @@ -61,20 +54,27 @@ selectorSyncTypes.forEach((item) => { }); /** -* test the granule._validateChecksum() method +* test the granule.validateChecksum() method **/ const sums = require('./fixtures/sums'); Object.keys(sums).forEach((key) => { - test(`granule._validateChecksum ${key}`, async (t) => { + test(`granule.validateChecksum ${key}`, async (t) => { const granule = new HttpGranule( ingestPayload.config.buckets, ingestPayload.config.collection, ingestPayload.config.provider ); const filepath = path.join(__dirname, 'fixtures', `${key}.txt`); - const validated = await granule._validateChecksum(key, sums[key], filepath); - t.true(validated); + try { + const file = { checksumType: key, checksumValue: sums[key] }; + await granule.validateChecksum(file, filepath, null); + await granule.validateChecksum(key, sums[key], filepath); + t.pass(); + } + catch (e) { + t.fail(e); + } }); }); diff --git a/tmp-test-data/.gitignore b/tmp-test-data/.gitignore new file mode 100644 index 00000000000..f8476208f65 --- /dev/null +++ b/tmp-test-data/.gitignore @@ -0,0 +1,3 @@ +* +!README +!.gitignore diff --git a/tmp-test-data/README b/tmp-test-data/README new file mode 100644 index 00000000000..3ae916b208b --- /dev/null +++ b/tmp-test-data/README @@ -0,0 +1,6 @@ +The http and sftp containers managed by docker-compose.yml will serve out any +files in this directory. The intention is that any files being fetched using +http or sftp should be copied into this directory (probably under a +subdirectory), fetched, and then deleted. + +The docker containers defined in docker-compose.yml are used for testing.