Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

From nsidc cumulus 3220: Create a new send-pan task #3416

Merged
merged 65 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
25e998c
release v15.0.0 (#3303)
jennyhliu Mar 10, 2023
15986ab
fix @aws-sdk/[email protected] error
jennyhliu Mar 10, 2023
0368752
Added a new send-pan task - more to be added to it
mikedorfman Mar 23, 2023
dad58f9
Added send-pan task in TF
mikedorfman Mar 23, 2023
0526ddd
Fix linter warnings
colecu Mar 23, 2023
f184527
Fix PVL dependency issue
colecu Mar 24, 2023
42b326e
Cleanup ESLint warnings
colecu Mar 24, 2023
98edd62
Add support for generating and writing the PAN locally
colecu Mar 24, 2023
0fa8ee3
Update `got` package version
colecu Mar 24, 2023
4f23fba
Updated to make pan->pdr replacement case insensitive
mikedorfman Mar 27, 2023
c394bd8
Merge remote-tracking branch 'origin/NDCUM-914' into NDCUM-914
mikedorfman Mar 27, 2023
13d4f66
Removed unused uploader methods
mikedorfman Mar 27, 2023
90adec8
Merge pull request #2 from nsidc/NDCUM-914
colecu Mar 27, 2023
a238f09
Fixed pdr replacement so we replace only instances of .pdr
mikedorfman Mar 27, 2023
3ac6f5d
Merge pull request #3 from nsidc/NDCUM-914
colecu Mar 27, 2023
8738c3c
Updated send_pan_test to include validation of PAN message
mikedorfman Mar 27, 2023
763bfea
Updated several references to `../t` in pvl tests
mikedorfman Mar 27, 2023
add1d04
Added details about the send-pan-task to the CHANGELOG
mikedorfman Mar 27, 2023
ea822cc
Update xml2js 0.4.22->0.5 strict (#3330) (#3339)
jennyhliu Apr 20, 2023
2223e93
CUMULUS-3285: Updated isAuthBearTokenRequest to handle non-Bearer aut…
jennyhliu Apr 20, 2023
723694e
update version to 15.0.1
jennyhliu Apr 20, 2023
ae4627c
update changelog
jennyhliu Apr 20, 2023
fb73047
Merge pull request #3342 from nasa/release-15.0.1
jennyhliu Apr 20, 2023
60f9950
Release 15.0.2 (#3349)
Nnaga1 Apr 26, 2023
4c85ec3
CUMULUS-3243:Updated granule delete logic (#3338)
jennyhliu Apr 28, 2023
8357358
update to v15.0.3
jennyhliu Apr 28, 2023
82bebfd
update changelog
jennyhliu Apr 28, 2023
5fd5feb
Merge pull request #3361 from nasa/release-15.0.3
jennyhliu Apr 28, 2023
7db392e
pvl move t.js to index.js
jennyhliu Jun 23, 2023
47696f0
Merge branch 'CUMULUS-3220' of https://github.com/nsidc/cumulus into …
jennyhliu Jun 23, 2023
3765df7
Merge branch 'nsidc-CUMULUS-3220' into from-nsidc-CUMULUS-3220
jennyhliu Jun 23, 2023
bb4a1b2
Merge branch 'master' of https://github.com/nasa/cumulus into from-ns…
jennyhliu Jun 23, 2023
ecf25a1
fix version
jennyhliu Jun 23, 2023
e203b02
fix package version
jennyhliu Jun 23, 2023
f55af3e
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jun 26, 2023
d070006
remove tasks/send-pan/package-lock.json
jennyhliu Jun 26, 2023
dc63ffe
fix lint
jennyhliu Jun 28, 2023
15d540a
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jul 17, 2023
fada800
add pdrHelpers
jennyhliu Jul 17, 2023
8a13389
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jul 17, 2023
633147c
add unit test code coverage
jennyhliu Jul 18, 2023
81b0c12
Merge branch 'from-nsidc-CUMULUS-3220' of https://github.com/nasa/cum…
jennyhliu Jul 18, 2023
4fd0e03
add s3 pan support refactor
jennyhliu Jul 19, 2023
4af0841
add unit test refactor
jennyhliu Jul 20, 2023
506cfc0
refactor
jennyhliu Jul 20, 2023
0d4e0f6
add SendPan task to ParsePdr workflow
jennyhliu Jul 21, 2023
96fd320
add schema move pdr to input
jennyhliu Jul 21, 2023
ec051c8
default pans location
jennyhliu Jul 21, 2023
5ca9ca6
fix lint and integration test
jennyhliu Jul 21, 2023
91e03c1
fix lint fix orca lambda version
jennyhliu Jul 21, 2023
5c815c8
fix unit test coverage
jennyhliu Jul 22, 2023
4b57af4
convert to typescript
jennyhliu Jul 23, 2023
7f55d5b
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jul 23, 2023
e0bdeb3
convert to typescript
jennyhliu Jul 23, 2023
621be22
fix test coverage value
jennyhliu Jul 23, 2023
c417cfb
fix typing, refactor
jennyhliu Jul 24, 2023
5b692a7
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jul 24, 2023
42b85c0
fix unit test
jennyhliu Jul 25, 2023
1916e60
Merge branch 'from-nsidc-CUMULUS-3220' of https://github.com/nasa/cum…
jennyhliu Jul 25, 2023
e5ee765
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jul 25, 2023
b4c784f
fix unit test
jennyhliu Jul 25, 2023
3e21ed7
Merge branch 'from-nsidc-CUMULUS-3220' of https://github.com/nasa/cum…
jennyhliu Jul 25, 2023
c669f07
pr feedback
jennyhliu Jul 27, 2023
41ffa64
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jul 28, 2023
fd742fa
Merge branch 'master' into from-nsidc-CUMULUS-3220
jennyhliu Jul 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## Unreleased

- **CUMULUS-3188**
- Updated QueueGranules to support queueing granules that meet the required API granule schema.
- Added optional additional properties to queue-granules input schema
### Added

- **CUMULUS-3220**
- Created a new send-pan task
- **CUMULUS-3287**
- Added variable to allow the aws_ecs_task_definition health check to be configurable.
- Added clarity to how the bucket field needs to be configured for the move-granules task definition
Expand All @@ -19,6 +20,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- **CUMULUS-2985**
- Changed `onetime` rules RuleTrigger to only execute when the state is `ENABLED` and updated documentation to reflect the change
- Changed the `invokeRerun` function to only re-run enabled rules
- **CUMULUS-3188**
- Updated QueueGranules to support queueing granules that meet the required API granule schema.
- Added optional additional properties to queue-granules input schema
- **CUMULUS-3252**
- Updated example/cumulus-tf/orca.tf to use orca v8.0.1
- Added cumulus task `@cumulus/orca-copy-to-archive-adapter`, and add the task to `tf-modules/ingest`
Expand Down
40 changes: 39 additions & 1 deletion example/cumulus-tf/parse_pdr_workflow.asl.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
{
"Variable": "$.meta.isPdrFinished",
"BooleanEquals": true,
"Next": "WorkflowSucceeded"
"Next": "SendPAN"
}
],
"Default": "WorkflowSucceeded"
Expand Down Expand Up @@ -193,6 +193,44 @@
"Seconds": 10,
"Next": "CheckStatus"
},
"SendPAN": {
"Parameters": {
"cma": {
"event.$": "$",
"ReplaceConfig": {
"FullMessage": true
},
"task_config": {
"provider": "{$.meta.provider}",
"remoteDir": "{$.meta.collection.meta.panPath}"
}
}
},
"Type": "Task",
"Resource": "${send_pan_arn}",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.exception",
"Next": "WorkflowFailed"
}
],
"Next": "WorkflowSucceeded"
},
"WorkflowFailed": {
"Type": "Fail",
"Cause": "Workflow failed"
Expand Down
1 change: 1 addition & 0 deletions example/cumulus-tf/parse_pdr_workflow.tf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module "parse_pdr_workflow" {
parse_pdr_task_arn: module.cumulus.parse_pdr_task.task_arn,
pdr_status_check_task_arn: module.cumulus.pdr_status_check_task.task_arn,
queue_granules_task_arn: module.cumulus.queue_granules_task.task_arn,
send_pan_arn: module.cumulus.send_pan_task.task_arn,
sf_sqs_report_task_arn: module.cumulus.sf_sqs_report_task.task_arn,
start_sf_queue_url: module.cumulus.start_sf_queue_url
}
Expand Down
3 changes: 2 additions & 1 deletion example/data/collections/s3_MOD09GQ_006/s3_MOD09GQ_006.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"url_path": "{cmrMetadata.Granule.Collection.ShortName}___{cmrMetadata.Granule.Collection.VersionId}/{substring(file.fileName, 0, 3)}",
"sampleFileName": "MOD09GQ.A2017025.h21v00.006.2017034065104.hdf",
"meta": {
"s3MultipartChunksizeMb": 16
"s3MultipartChunksizeMb": 16,
"panPath": "integration-test/pans/MOD09GQ___006"
},
"files": [
{
Expand Down
31 changes: 29 additions & 2 deletions example/spec/parallel/ingest/ingestFromPdrSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@

const flatten = require('lodash/flatten');
const cryptoRandomString = require('crypto-random-string');
const path = require('path');

const { deleteS3Object, s3ObjectExists } = require('@cumulus/aws-client/S3');
const { buildS3Uri, deleteS3Object, s3ObjectExists } = require('@cumulus/aws-client/S3');
const { s3 } = require('@cumulus/aws-client/services');
const { LambdaStep } = require('@cumulus/integration-tests/sfnStep');
const { getPdr } = require('@cumulus/api-client/pdrs');
Expand Down Expand Up @@ -478,7 +479,7 @@ describe('Ingesting from PDR', () => {

// the output of the CheckStatus is used to determine the task of choice
const checkStatusTaskName = 'CheckStatus';
const successStepName = 'WorkflowSucceeded';
const successStepName = 'SendPAN';
const pdrStatusReportTaskName = 'PdrStatusReport';

let choiceVerified = false;
Expand Down Expand Up @@ -516,6 +517,32 @@ describe('Ingesting from PDR', () => {
}
});
});

describe('SendPan lambda function', () => {
let lambdaOutput;
beforeAll(async () => {
try {
lambdaOutput = await lambdaStep.getStepOutput(parsePdrExecutionArn, 'SendPan');
} catch (error) {
beforeAllFailed = error;
}
});

// SfSnsReport lambda is used in the workflow multiple times, apparently, only the first output
it('has expected pan output', async () => {
if (beforeAllFailed) fail(beforeAllFailed);
const panName = lambdaOutput.payload.pdr.name.replace(/\.pdr/gi, '.pan');
const panKey = path.join(addedCollections[0].meta.panPath, panName);
const expectedPanUri = buildS3Uri(config.bucket, panKey);
expect(lambdaOutput.payload.pan.uri).toEqual(expectedPanUri);
const panExists = await s3ObjectExists({
Bucket: config.bucket,
Key: panKey,
});
expect(panExists).toEqual(true);
await deleteS3Object(config.bucket, panKey);
});
});
});

describe('the reporting lambda has received the CloudWatch step function event and', () => {
Expand Down
6 changes: 1 addition & 5 deletions example/spec/parallel/ingest/ingestPdrWithNodeNameSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ describe('Ingesting from PDR', () => {
deleteFolder(config.bucket, testDataFolder),
cleanupCollections(config.stackName, config.bucket, collectionsDir, testSuffix),
cleanupProviders(config.stackName, config.bucket, providersDir, testSuffix),
apiTestUtils.deletePdr({
prefix: config.stackName,
pdr: pdrFilename,
}),
]).catch(console.error);

await providersApi.deleteProvider({
Expand Down Expand Up @@ -512,7 +508,7 @@ describe('Ingesting from PDR', () => {

// the output of the CheckStatus is used to determine the task of choice
const checkStatusTaskName = 'CheckStatus';
const successStepName = 'WorkflowSucceeded';
const successStepName = 'SendPAN';
jennyhliu marked this conversation as resolved.
Show resolved Hide resolved
const pdrStatusReportTaskName = 'PdrStatusReport';

let choiceVerified = false;
Expand Down
1 change: 1 addition & 0 deletions packages/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ exports.models = require('./models');
exports.testUtils = require('./lib/testUtils');
exports.tokenUtils = require('./lib/token');
exports.snsRuleHelpers = require('./lib/snsRuleHelpers');
exports.pdrHelpers = require('./lib/pdrHelpers');
36 changes: 36 additions & 0 deletions packages/api/lib/pdrHelpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';

const pvl = require('@cumulus/pvl');

/**
* Generate PAN message
*
* @returns {string} the PAN message
*/
function generatePAN() {
return pvl.jsToPVL(
new pvl.models.PVLRoot()
.add('MESSAGE_TYPE', new pvl.models.PVLTextString('SHORTPAN'))
.add('DISPOSITION', new pvl.models.PVLTextString('SUCCESSFUL'))
.add('TIME_STAMP', new pvl.models.PVLDateTime(new Date()))
);
}

/**
* Generate a PDRD message with a given err
*
* @param {object} err - the error object
* @returns {string} the PDRD message
*/
function generatePDRD(err) {
return pvl.jsToPVL(
new pvl.models.PVLRoot()
.add('MESSAGE_TYPE', new pvl.models.PVLTextString('SHORTPDRD'))
.add('DISPOSITION', new pvl.models.PVLTextString(err.message))
);
}

module.exports = {
generatePAN,
generatePDRD,
};
38 changes: 36 additions & 2 deletions packages/ingest/src/HttpProviderClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Crawler = require('simplecrawler');
const got = require('got');
const { CookieJar } = require('tough-cookie');
const { promisify } = require('util');
const stream = require('node:stream');

const {
buildS3Uri,
Expand Down Expand Up @@ -206,7 +207,7 @@ class HttpProviderClient {
/**
* Download a remote file to disk
*
* @param {Object} params
* @param {object} params
botanical marked this conversation as resolved.
Show resolved Hide resolved
* @param {string} params.remotePath - the full path to the remote file to be fetched
* @param {string} params.localPath - the full local destination file path
* @returns {Promise.<string>} - the path that the file was saved to
Expand Down Expand Up @@ -244,7 +245,7 @@ class HttpProviderClient {
/**
* Download the remote file to a given s3 location
*
* @param {Object} params
* @param {object} params
botanical marked this conversation as resolved.
Show resolved Hide resolved
* @param {string} params.fileRemotePath - the full path to the remote file to be fetched
* @param {string} params.destinationBucket - destination s3 bucket of the file
* @param {string} params.destinationKey - destination s3 key of the file
Expand Down Expand Up @@ -291,6 +292,39 @@ class HttpProviderClient {
return { s3uri, etag };
}

/**
* Upload a file
*
* @param {object} params
botanical marked this conversation as resolved.
Show resolved Hide resolved
* @param {string} params.localPath - the full local file path
* @param {string} params.uploadPath - the full remote file path for uploading file to
* @returns {Promise<string>} the uri of the uploaded file
*/
async upload(params) {
const { localPath, uploadPath } = params;
log.info(params);
await this.setUpGotOptions();
await this.downloadTLSCertificate();
const options = {
protocol: this.protocol,
host: this.host,
port: this.port,
path: uploadPath,
method: 'POST',
};

const remoteUrl = buildURL(options);
got.stream.options = options;
await promisify(pipeline)(
fs.createReadStream(localPath),
await got.stream.post(remoteUrl),
new stream.PassThrough()
);

log.info(`Finishing uploading ${localPath} to ${remoteUrl}`);
return remoteUrl;
}

/* eslint-disable no-empty-function */
async connect() {}

Expand Down
34 changes: 31 additions & 3 deletions packages/ingest/src/S3ProviderClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class S3ProviderClient implements ProviderClient {
/**
* Download a remote file to disk
*
* @param {Object} params
* @param {object} params
botanical marked this conversation as resolved.
Show resolved Hide resolved
* @param {string} params.remotePath - the full path to the remote file to be fetched
* @param {string} params.localPath - the full local destination file path
* @param {string} params.remoteAltBucket - alternate per-file bucket override to this.bucket
Expand Down Expand Up @@ -79,8 +79,8 @@ class S3ProviderClient implements ProviderClient {
/**
* Download the remote file to a given s3 location
*
* @param {Object} params - the full path to the remote file to be fetched
* @param {string} params.sourceKey - the full path to the remote file to be fetched
* @param {object} params
* @param {string} params.fileRemotePath - the full path to the remote file to be fetched
jennyhliu marked this conversation as resolved.
Show resolved Hide resolved
* @param {string} params.bucket - destination s3 bucket of the file
* @param {string} params.destinationBucket - destination s3 bucket of the file
* @param {string} params.destinationKey - destination s3 key of the file
Expand Down Expand Up @@ -145,6 +145,34 @@ class S3ProviderClient implements ProviderClient {
}
}

/**
* Upload a file
*
* @param {object} params
botanical marked this conversation as resolved.
Show resolved Hide resolved
* @param {string} params.localPath - the full local file path
* @param {string} params.uploadPath - the full remote file path for uploading file to
* @param {string} [params.remoteAltBucket] - alternate per-file bucket override to this.bucket
* @returns {Promise<string>} the uri of the uploaded file
*/
async upload(params: {
localPath: string,
uploadPath: string,
remoteAltBucket?: string,
}) {
const { localPath, uploadPath, remoteAltBucket } = params;
log.info(params);
const remoteBucket = remoteAltBucket || this.bucket;
await S3.putFile(
remoteBucket,
uploadPath,
localPath
);

const s3Uri = S3.buildS3Uri(remoteBucket, uploadPath);
log.info(`Finishing uploading ${localPath} to ${s3Uri}`);
return s3Uri;
}

/* eslint-disable @typescript-eslint/no-empty-function */
async connect(): Promise<void> {}

Expand Down
15 changes: 15 additions & 0 deletions packages/ingest/test/test-HttpProviderClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,18 @@ test.serial('list fails if client wait time is set less than the response delay'
}
);
});

test.serial('upload() attemps to upload a file', async (t) => {
const localPath = path.join(tmpdir(), randomString());
t.teardown(() => fs.unlinkSync(localPath));
const uploadPath = path.join(randomString(), 'destinationfile.txt');
fs.writeFileSync(localPath, randomString());

const { httpProviderClient } = t.context;
nock('http://localhost:3030')
.post(path.join('/', uploadPath))
.reply(200);

await httpProviderClient.upload({ localPath, uploadPath });
t.true(nock.isDone());
});
24 changes: 20 additions & 4 deletions packages/ingest/test/test-S3ProviderClient.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const { basename, dirname } = require('path');
const path = require('path');
const { tmpdir } = require('os');
const fs = require('fs');
const test = require('ava');

Expand Down Expand Up @@ -50,16 +51,16 @@ test.serial('S3ProviderClient.list lists objects from the bucket root with paths

const files = await s3ProviderClient.list('');
t.is(files.length, 1);
t.is(files[0].name, basename(t.context.sourceKey));
t.is(files[0].path, dirname(t.context.sourceKey));
t.is(files[0].name, path.basename(t.context.sourceKey));
t.is(files[0].path, path.dirname(t.context.sourceKey));
});

test.serial('S3ProviderClient.list lists objects under a path in a bucket', async (t) => {
const s3ProviderClient = new S3ProviderClient({ bucket: t.context.sourceBucket });

const files = await s3ProviderClient.list(t.context.sourcePrefix);
t.is(files.length, 1);
t.is(files[0].name, basename(t.context.sourceKey));
t.is(files[0].name, path.basename(t.context.sourceKey));
});

test.serial('S3ProviderClient.download downloads a file to local disk', async (t) => {
Expand Down Expand Up @@ -161,3 +162,18 @@ test.serial('S3ProviderClient.sync throws an error if the source file does not e
}
);
});

test.serial('S3ProviderClient.upload uploads a file', async (t) => {
const s3ProviderClient = new S3ProviderClient({ bucket: t.context.sourceBucket });
const localPath = path.join(tmpdir(), randomString());
t.teardown(() => fs.unlinkSync(localPath));
const uploadPath = path.join(randomString(), 'destinationfile.txt');

fs.writeFileSync(localPath, t.context.fileContent);
await s3ProviderClient.upload({ localPath, uploadPath });

t.is(
await S3.getTextObject(t.context.sourceBucket, uploadPath),
t.context.fileContent
);
});
Loading