Skip to content

Commit

Permalink
Merge branch 'master' into CUMULUS-748
Browse files Browse the repository at this point in the history
  • Loading branch information
Jkovarik authored Jul 3, 2018
2 parents 801b2b8 + 30c7e01 commit 1681b11
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 33 deletions.
18 changes: 16 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,24 @@ jobs:
- ./tasks/sf-sns-report/node_modules

- run:
name: Running Tests
name: Running Packages Tests
environment:
LOCALSTACK_HOST: localstack
command: yarn test
command: |
cp lerna.json lerna.json.backup
cat lerna.json.backup | jq '.packages = ["packages/*"]' > lerna.json
yarn test
- run:
name: Running Tasks Tests
environment:
LOCALSTACK_HOST: localstack
command: |
cat lerna.json.backup | jq '.packages = ["tasks/*"]' > lerna.json
yarn test
# undo all changes to lerna.json
git checkout -- lerna.json
- run:
name: Running End to End Tests
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- **CUMULUS-681** - Add ingest-in-place action to granules endpoint
- new applyWorkflow action at PUT /granules/{granuleid} Applying a workflow has additional parameters:
- workflow - the workflow name
- messageSource - 'input' or 'output' from previous execution
- metaOverride - overrides the meta of the new execution, accepts partial override
- payloadOverride - overrides the payload of the new execution, accepts partial override

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Cumulus Framework for ingesting and processing Nasa Earth data streams",
"scripts": {
"e2e": "ava tests/*.js --serial",
"test": "nyc lerna run test",
"test": "nyc lerna run test --concurrency 2",
"bootstrap": "lerna bootstrap --npm-client=npm",
"ybootstrap": "lerna bootstrap",
"bootstrap-no-build": "lerna bootstrap --ignore-scripts",
Expand Down
16 changes: 9 additions & 7 deletions packages/api/endpoints/granules.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ function list(event, cb) {

/**
* Update a single granule.
* Supported Actions: reingest, Remove From CMR.
* Supported Actions: reingest, applyWorkflow, RemoveFromCMR.
*
* @param {Object} event - aws lambda event object.
* @returns {Promise} response from the actions
Expand All @@ -38,12 +38,14 @@ async function put(event) {
if (action) {
const response = await g.get({ granuleId });
if (action === 'reingest') {
await g.reingest(response);
return {
granuleId: response.granuleId,
action,
status: 'SUCCESS'
};
return await g.reingest(response);
}
if (action === 'applyWorkflow') {
const workflow = _get(body, 'workflow');
const messageSource = _get(body, 'messageSource');
const metaOverride = _get(body, 'metaOverride');
const payloadOverride = _get(body, 'payloadOverride');
return await g.applyWorkflow(response, workflow, messageSource, metaOverride, payloadOverride);
}
else if (action === 'removeFromCmr') {
await g.removeGranuleFromCmr(response.granuleId, response.collectionId);
Expand Down
73 changes: 52 additions & 21 deletions packages/api/models/granules.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const path = require('path');
const get = require('lodash.get');
const clonedeep = require('lodash.clonedeep');
const merge = require('lodash.merge');
const uniqBy = require('lodash.uniqby');
const cmrjs = require('@cumulus/cmrjs');
const { CMR } = require('@cumulus/cmrjs');
Expand Down Expand Up @@ -63,29 +64,10 @@ class Granule extends Manager {
* start the re-ingest of a given granule object
*
* @param {Object} g - the granule object
* @returns {Promise} an object showing the start of the re-ingest
* @returns {Promise<Object>} an object showing the start of the re-ingest
*/
async reingest(g) {
const { name, version } = deconstructCollectionId(g.collectionId);

// get the payload of the original execution
const status = await aws.StepFunction.getExecutionStatus(path.basename(g.execution));
const originalMessage = JSON.parse(status.execution.input);

const payload = await Rule.buildPayload({
workflow: 'IngestGranule',
provider: g.provider,
collection: {
name,
version
},
meta: originalMessage.meta,
payload: originalMessage.payload
});

await this.updateStatus({ granuleId: g.granuleId }, 'running');

await aws.invoke(process.env.invoke, payload);
await this.applyWorkflow(g, 'IngestGranule', 'input');
return {
granuleId: g.granuleId,
action: 'reingest',
Expand All @@ -94,6 +76,55 @@ class Granule extends Manager {
}

/**
* apply a workflow to a given granule object
*
* @param {Object} g - the granule object
* @param {string} workflow - the workflow name
* @param {string} messageSource - 'input' or 'output' from previous execution
* @param {Object} metaOverride - overrides the meta of the new execution, accepts partial override
* @param {Object} payloadOverride - overrides the payload of the new execution, accepts partial override
* @returns {Promise<Object>} an object showing the start of the workflow execution
*/
async applyWorkflow(g, workflow, messageSource, metaOverride, payloadOverride) {
const { name, version } = deconstructCollectionId(g.collectionId);

try {
// get the payload of the original execution
const status = await aws.StepFunction.getExecutionStatus(path.basename(g.execution));
const originalMessage = JSON.parse(status.execution[messageSource]);

const payload = await Rule.buildPayload({
workflow,
provider: g.provider,
collection: {
name,
version
},
meta: metaOverride ? merge(originalMessage.meta, metaOverride) : originalMessage.meta,
payload: payloadOverride ? merge(originalMessage.payload, payloadOverride) : originalMessage.payload
});

await this.updateStatus({ granuleId: g.granuleId }, 'running');

await aws.invoke(process.env.invoke, payload);
return {
granuleId: g.granuleId,
action: `applyWorkflow ${workflow}`,
status: 'SUCCESS'
};
}
catch (e) {
log.error(g.granuleId, e);
return {
granuleId: g.granuleId,
action: `applyWorkflow ${workflow}`,
status: 'FAILED',
error: e.message
}
}
}

/**
* Move a granule's files to destination locations specified
*
* @param {Object} g - the granule object
Expand Down
61 changes: 59 additions & 2 deletions packages/api/tests/test-endpoints-granules.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ test('reingest a granule', async (t) => {

// fake workflow
process.env.bucket = process.env.internal;
const payload = JSON.parse(fakeSFResponse.execution.input);
const key = `${process.env.stackName}/workflows/${payload.meta.workflow_name}.json`;
const message = JSON.parse(fakeSFResponse.execution.input);
const key = `${process.env.stackName}/workflows/${message.meta.workflow_name}.json`;
await aws.s3().putObject({ Bucket: process.env.bucket, Key: key, Body: 'test data' }).promise();

sinon.stub(
Expand All @@ -170,6 +170,63 @@ test('reingest a granule', async (t) => {
StepFunction.getExecutionStatus.restore();
});

test('apply an in-place workflow to an existing granule', async (t) => {
const fakeSFResponse = {
execution: {
input: JSON.stringify({
meta: {
workflow_name: 'inPlaceWorkflow'
},
payload: {}
})
}
};

const event = {
httpMethod: 'PUT',
pathParameters: {
granuleName: fakeGranules[0].granuleId
},
body: JSON.stringify({
action:'applyWorkflow',
workflow: 'inPlaceWorkflow',
messageSource: 'output'
})
};

//fake in-place workflow
process.env.bucket = process.env.internal;
const message = JSON.parse(fakeSFResponse.execution.input);
const key = `${process.env.stackName}/workflows/${message.meta.workflow_name}.json`;
await aws.s3().putObject({ Bucket: process.env.bucket, Key: key, Body: 'fake in-place workflow' }).promise();

// return fake previous execution
sinon.stub(
StepFunction,
'getExecutionStatus'
).callsFake(() => Promise.resolve({
execution: {
output: JSON.stringify({
meta: {
workflow_name: 'IngestGranule'
},
payload: {}
})
}
}));

await testEndpoint(granuleEndpoint, event, (response) => {
const body = JSON.parse(response.body);
t.is(body.status, 'SUCCESS');
t.is(body.action, 'applyWorkflow inPlaceWorkflow');
return response;
});

const updatedGranule = await g.get({ granuleId: fakeGranules[0].granuleId });
t.is(updatedGranule.status, 'running');

StepFunction.getExecutionStatus.restore();
});

test('remove a granule from CMR', async (t) => {
const event = {
Expand Down

0 comments on commit 1681b11

Please sign in to comment.