Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CUMULUS-2688: Update bulk operations to fetch granules by unique columns #3000

Merged
merged 27 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cbe0e44
Add endpoint configuration to get granule by collection and granule ID
npauzenga May 31, 2022
c6a12dd
add test for getGranuleByUniqueColumns
npauzenga Jun 6, 2022
1b519a0
add docs, export new granules function, fix test
npauzenga Jun 6, 2022
a0d78db
switch to collectionId instead of collection_cumulus_id for granule GET
npauzenga Jun 7, 2022
120ced7
Fix merge error
npauzenga Jun 7, 2022
3be9fb1
Update granule get to throw correct errors
npauzenga Jun 7, 2022
2588d6c
update api-client configuration with collectionId instead of collecti…
npauzenga Jun 7, 2022
cea0306
fix lint errors
npauzenga Jun 7, 2022
15415c3
refactor api-client function and deprecate lib function
npauzenga Jun 8, 2022
54734da
Update CHANGELOG.md
npauzenga Jun 8, 2022
720805b
update docs
npauzenga Jun 13, 2022
1fdde4b
Merge branch 'feature/rds-phase-3' into feature/CUMULUS-2688-new-gran…
npauzenga Jun 14, 2022
4804dff
update tests for bulk operation granule lookups using collectionId
npauzenga Jun 16, 2022
3af1c65
Merge branch 'feature/CUMULUS-2688-new-granule-endpoint' into feature…
npauzenga Jun 16, 2022
ee8e1f3
fix remaining bulk operation tests
npauzenga Jun 16, 2022
f3d62d5
Refactor tests for less duplication
npauzenga Jun 17, 2022
2006805
fix lint errors
npauzenga Jun 17, 2022
5a89f58
update bulk delete test for granule lookups
npauzenga Jun 21, 2022
20c8242
Update CHANGELOG.md
npauzenga Jun 21, 2022
c15136a
fix lint errors
npauzenga Jun 22, 2022
393d153
update endpoint logic for getting granules by unique keys
npauzenga Jun 22, 2022
709a18f
fix lint errors
npauzenga Jun 22, 2022
886a739
Update integration test to fetch granule by unique columns
npauzenga Jun 22, 2022
aff0ca6
update integration tests to fetch granules by unique columns
npauzenga Jun 22, 2022
1c28013
Merge branch 'feature/rds-phase-3' into feature/CUMULUS-2688-bulk-ope…
npauzenga Jul 22, 2022
cc0c468
fix lint and merge error
npauzenga Jul 25, 2022
3e445aa
Merge branch 'feature/rds-phase-3' into feature/CUMULUS-2688-bulk-ope…
npauzenga Jul 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## Unreleased Phase 3

### Breaking Changes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if there should be migration instructions for this... 🤔.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good call. I'll update that repo once all three tickets for 2688 are in as they all have changes that'll need to be noted. It looks like we might need a new rds-phase-3 branch there that gets released at the same time as the main cumulus phase 3 release too 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


- **CUMULUS-2688**
- Updated bulk operation logic to use collectionId in addition to granuleId to fetch granules.
- Tasks using the `bulk-operation` Lambda should provide collectionId and granuleId e.g. { granuleId: xxx, collectionId: xxx }

### Changed

- **CUMULUS-2312** - RDS Migration Epic Phase 3
Expand Down
2 changes: 1 addition & 1 deletion example/spec/parallel/testAPI/bulkDeleteFailureSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe('POST /granules/bulkDelete with a failed bulk delete operation', () =>
postBulkDeleteResponse = await bulkDeleteGranules({
prefix: config.stackName,
body: {
ids: [granule.granuleId],
granules: [{ granuleId: granule.granuleId, collectionId: constructCollectionId(collection.name, collection.version) }],
},
});
postBulkDeleteBody = JSON.parse(postBulkDeleteResponse.body);
Expand Down
4 changes: 2 additions & 2 deletions example/spec/parallel/testAPI/bulkDeleteSuccessSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const {
const { getGranuleWithStatus } = require('@cumulus/integration-tests/Granules');
const { createProvider } = require('@cumulus/integration-tests/Providers');
const { createOneTimeRule } = require('@cumulus/integration-tests/Rules');

const { constructCollectionId } = require('@cumulus/message/Collections');
const {
createTimestampedTestId,
createTestSuffix,
Expand Down Expand Up @@ -163,7 +163,7 @@ describe('POST /granules/bulkDelete', () => {
{
prefix,
body: {
ids: [granuleId],
granules: [{ granuleId, collectionId: constructCollectionId(collection.name, collection.version) }],
// required to force removal of granules from CMR before deletion
forceRemoveFromCmr: true,
},
Expand Down
3 changes: 2 additions & 1 deletion example/spec/parallel/testAPI/bulkOperationsSuccessSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
const { getGranuleWithStatus } = require('@cumulus/integration-tests/Granules');
const { createProvider } = require('@cumulus/integration-tests/Providers');
const { createOneTimeRule } = require('@cumulus/integration-tests/Rules');
const { constructCollectionId } = require('@cumulus/message/Collections');

const {
createTimestampedTestId,
Expand Down Expand Up @@ -167,7 +168,7 @@ describe('POST /granules/bulk', () => {
postBulkGranulesResponse = await granules.bulkGranules({
prefix,
body: {
ids: [granuleId],
granules: [{ granuleId, collectionId: constructCollectionId(collection.name, collection.version) }],
workflowName: 'HelloWorldWorkflow',
queueUrl: scheduleQueueUrl,
},
Expand Down
28 changes: 28 additions & 0 deletions packages/api-client/tests/test-granules.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,34 @@ test('getGranule calls the callback with the expected object when there is query
}));
});

test('getGranule accepts an optional collectionId', async (t) => {
const expected = {
prefix: t.context.testPrefix,
payload: {
httpMethod: 'GET',
resource: '/{proxy+}',
path: `/granules/${t.context.collectionId}/${t.context.granuleId}`,
},
};

const callback = (configObject) => {
t.deepEqual(configObject, expected);
return Promise.resolve({
body: JSON.stringify({
granuleId: t.context.granuleId,
collectionId: t.context.collectionId,
}),
});
};

await t.notThrowsAsync(granulesApi.getGranule({
callback,
prefix: t.context.testPrefix,
granuleId: t.context.granuleId,
collectionId: t.context.collectionId,
}));
});

test('waitForGranules calls getGranules with the expected payload', async (t) => {
const callback = ({ prefix, payload }) => {
t.true(payload.path.endsWith(t.context.granuleId));
Expand Down
18 changes: 14 additions & 4 deletions packages/api/endpoints/granules.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const {
ExecutionPgModel,
getKnexClient,
getUniqueGranuleByGranuleId,
getGranuleByUniqueColumns,
GranulePgModel,
translateApiGranuleToPostgresGranule,
translatePostgresCollectionToApiCollection,
Expand Down Expand Up @@ -548,8 +549,17 @@ async function getByGranuleId(req, res) {
const granuleId = req.params.granuleName;

let granule;
let pgCollection;
try {
granule = await getUniqueGranuleByGranuleId(knex, granuleId);
if (collectionId) {
pgCollection = await collectionPgModel.get(
knex, deconstructCollectionId(collectionId)
);

granule = await getGranuleByUniqueColumns(knex, granuleId, pgCollection.cumulus_id);
} else {
granule = await getUniqueGranuleByGranuleId(knex, granuleId);
}
} catch (error) {
if (error instanceof RecordDoesNotExist) {
if (granule === undefined) {
Expand Down Expand Up @@ -582,8 +592,8 @@ async function bulkOperations(req, res) {
let description;
if (payload.query) {
description = `Bulk run ${payload.workflowName} on ${payload.query.size} granules`;
} else if (payload.ids) {
description = `Bulk run ${payload.workflowName} on ${payload.ids.length} granules`;
} else if (payload.granules) {
description = `Bulk run ${payload.workflowName} on ${payload.granules.length} granules`;
} else {
description = `Bulk run on ${payload.workflowName}`;
}
Expand Down Expand Up @@ -671,7 +681,7 @@ async function bulkDelete(req, res) {
async function bulkReingest(req, res) {
const payload = req.body;
const numOfGranules = (payload.query && payload.query.size)
|| (payload.ids && payload.ids.length);
|| (payload.granules && payload.granules.length);
const description = `Bulk granule reingest run on ${numOfGranules || ''} granules`;

const asyncOperationId = uuidv4();
Expand Down
95 changes: 67 additions & 28 deletions packages/api/lambdas/bulk-operation.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,47 @@ const Logger = require('@cumulus/logger');
const {
GranulePgModel,
getKnexClient,
getUniqueGranuleByGranuleId,
translatePostgresGranuleToApiGranule,
getGranuleByUniqueColumns,
CollectionPgModel,
} = require('@cumulus/db');
const { RecordDoesNotExist } = require('@cumulus/errors');

const { deconstructCollectionId } = require('@cumulus/message/Collections');
const { chooseTargetExecution } = require('../lib/executions');
const { deleteGranuleAndFiles } = require('../src/lib/granule-delete');
const { unpublishGranule } = require('../lib/granule-remove-from-cmr');
const { updateGranuleStatusToQueued } = require('../lib/writeRecords/write-granules');
const { getGranuleIdsForPayload } = require('../lib/granules');
const { getGranulesForPayload } = require('../lib/granules');
const { reingestGranule, applyWorkflow } = require('../lib/ingest');

const log = new Logger({ sender: '@cumulus/bulk-operation' });

async function applyWorkflowToGranules({
granuleIds,
granules,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aren't really granules. They're { collectionId, granuleId}. If this is confusing I can rename.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it should be something better across the stack, esp at the endpoint level if we're renaming there anyway. I just can't decide on a good name. 🤔

Copy link
Contributor Author

@npauzenga npauzenga Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's where I'm getting stuck. It's really granuleUniqueIdsButNotCumulusIds 😄. Though the API doesn't return cumulus_id at all so maybe it would only be slightly confusing on the dev side.

We could do something like granuleAndCollectionIds but that could also be ambiguous because "ID" can mean a couple things in our schema. It highlights the problem of not exposing cumulus_id to these endpoints/users.

The more I think about it the more I'm leaning towards keeping granules. My reasoning (subject to change) is that you could pass an entire API Granule object and it would work. It's just that we really only need granuleId and collectionId.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find a convincing reason to disagree.

workflowName,
meta,
queueUrl,
granulePgModel = new GranulePgModel(),
collectionPgModel = new CollectionPgModel(),
granuleTranslateMethod = translatePostgresGranuleToApiGranule,
applyWorkflowHandler = applyWorkflow,
updateGranulesToQueuedMethod = updateGranuleStatusToQueued,
knex,
}) {
return await pMap(
granuleIds,
(async (granuleId) => {
granules,
(async (granule) => {
try {
const pgGranule = await getUniqueGranuleByGranuleId(
const collection = await collectionPgModel.get(
knex,
granuleId,
deconstructCollectionId(granule.collectionId)
);

const pgGranule = await getGranuleByUniqueColumns(
knex,
granule.granuleId,
collection.cumulus_id,
granulePgModel
);
const apiGranule = await granuleTranslateMethod({
Expand All @@ -51,10 +60,10 @@ async function applyWorkflowToGranules({
queueUrl,
asyncOperationId: process.env.asyncOperationId,
});
return granuleId;
return granule.granuleId;
} catch (error) {
log.error(`Granule ${granuleId} encountered an error`, error);
return { granuleId, err: error };
log.error(`Granule ${granule.granuleId} encountered an error`, error);
return { granuleId: granule.granuleId, err: error };
}
})
);
Expand All @@ -70,7 +79,8 @@ async function applyWorkflowToGranules({
* @param {Object} [payload.query] - Optional parameter of query to send to ES
* @param {string} [payload.index] - Optional parameter of ES index to query.
* Must exist if payload.query exists.
* @param {Object} [payload.ids] - Optional list of granule IDs to bulk operate on
* @param {Object} [payload.granules] - Optional list of granule unique IDs to bulk operate on
* e.g. { granuleId: xxx, collectionID: xxx }
* @param {Function} [unpublishGranuleFunc] - Optional function to delete the
* granule from CMR. Useful for testing.
* @returns {Promise}
Expand All @@ -81,16 +91,28 @@ async function bulkGranuleDelete(
) {
const deletedGranules = [];
const forceRemoveFromCmr = payload.forceRemoveFromCmr === true;
const granuleIds = await getGranuleIdsForPayload(payload);
const granules = await getGranulesForPayload(payload);
const knex = await getKnexClient();

await pMap(
granuleIds,
async (granuleId) => {
granules,
async (granule) => {
let pgGranule;
const granulePgModel = new GranulePgModel();
const collectionPgModel = new CollectionPgModel();

const collection = await collectionPgModel.get(
knex,
deconstructCollectionId(granule.collectionId)
);

try {
pgGranule = await getUniqueGranuleByGranuleId(knex, granuleId, granulePgModel);
pgGranule = await getGranuleByUniqueColumns(
knex,
granule.granuleId,
collection.cumulus_id,
granulePgModel
);
} catch (error) {
// PG Granule being undefined will be caught by deleteGranulesAndFiles
if (error instanceof RecordDoesNotExist) {
Expand All @@ -112,7 +134,7 @@ async function bulkGranuleDelete(
pgGranule,
});

deletedGranules.push(granuleId);
deletedGranules.push(granule.granuleId);
},
{
concurrency: 10, // is this necessary?
Expand All @@ -133,17 +155,18 @@ async function bulkGranuleDelete(
* @param {Object} [payload.query] - Optional parameter of query to send to ES
* @param {string} [payload.index] - Optional parameter of ES index to query.
* Must exist if payload.query exists.
* @param {Object} [payload.ids] - Optional list of granule IDs to bulk operate on
* @param {Object} [payload.granules] - Optional list of granule unique IDs to bulk operate on
* e.g. { granuleId: xxx, collectionID: xxx }
* @param {function} [applyWorkflowHandler] - Optional handler for testing
* @returns {Promise}
*/
async function bulkGranule(payload, applyWorkflowHandler) {
const knex = await getKnexClient();
const { queueUrl, workflowName, meta } = payload;
const granuleIds = await getGranuleIdsForPayload(payload);
const granules = await getGranulesForPayload(payload);
return await applyWorkflowToGranules({
knex,
granuleIds,
granules,
meta,
queueUrl,
workflowName,
Expand All @@ -155,22 +178,38 @@ async function bulkGranuleReingest(
payload,
reingestHandler = reingestGranule
) {
const granuleIds = await getGranuleIdsForPayload(payload);
log.info(`Starting bulkGranuleReingest for ${JSON.stringify(granuleIds)}`);
const granules = await getGranulesForPayload(payload);
log.info(`Starting bulkGranuleReingest for ${JSON.stringify(granules)}`);
const knex = await getKnexClient();

const workflowName = payload.workflowName;
return await pMap(
granuleIds,
async (granuleId) => {
granules,
async (granule) => {
const granulePgModel = new GranulePgModel();
const collectionPgModel = new CollectionPgModel();

const collection = await collectionPgModel.get(
knex,
deconstructCollectionId(granule.collectionId)
);

try {
const pgGranule = await getUniqueGranuleByGranuleId(knex, granuleId);
const pgGranule = await getGranuleByUniqueColumns(
knex,
granule.granuleId,
collection.cumulus_id,
granulePgModel
);
const apiGranule = await translatePostgresGranuleToApiGranule({
granulePgRecord: pgGranule,
knexOrTransaction: knex,
});

const targetExecution = await chooseTargetExecution({ granuleId, workflowName });
const targetExecution = await chooseTargetExecution({
granuleId: granule.granuleId,
workflowName,
});
const apiGranuleToReingest = {
...apiGranule,
...(targetExecution && { execution: targetExecution }),
Expand All @@ -180,10 +219,10 @@ async function bulkGranuleReingest(
apiGranule: apiGranuleToReingest,
asyncOperationId: process.env.asyncOperationId,
});
return granuleId;
return granule.granuleId;
} catch (error) {
log.error(`Granule ${granuleId} encountered an error`, error);
return { granuleId, err: error };
log.error(`Granule ${granule.granuleId} encountered an error`, error);
return { granuleId: granule.granuleId, err: error };
}
},
{
Expand Down
2 changes: 1 addition & 1 deletion packages/api/lib/granules.js
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async function getGranuleIdsForPayload(payload) {
* @param {Object} [payload.query] - Optional parameter of query to send to ES
* @param {string} [payload.index] - Optional parameter of ES index to query.
* Must exist if payload.query exists.
* @returns {Promise<Array<Object>>}
* @returns {Promise<Array<ApiGranule>>}
*/
async function getGranulesForPayload(payload) {
const { granules, index, query } = payload;
Expand Down
13 changes: 7 additions & 6 deletions packages/api/lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,20 @@ async function verifyJwtAuthorization(requestJwtToken) {
return accessToken;
}

// TODO no tests for this
function validateBulkGranulesRequest(req, res, next) {
const payload = req.body;

if (!payload.ids && !payload.query) {
return res.boom.badRequest('One of ids or query is required');
if (!payload.granules && !payload.query) {
return res.boom.badRequest('One of granules or query is required');
}

if (payload.ids && !Array.isArray(payload.ids)) {
return res.boom.badRequest(`ids should be an array of values, received ${payload.ids}`);
if (payload.granules && !Array.isArray(payload.granules)) {
return res.boom.badRequest(`granules should be an array of values, received ${payload.granules}`);
}

if (!payload.query && payload.ids && payload.ids.length === 0) {
return res.boom.badRequest('no values provided for ids');
if (!payload.query && payload.granules && payload.granules.length === 0) {
return res.boom.badRequest('no values provided for granules');
}

if (payload.query
Expand Down
Loading