Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 74 additions & 212 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import {
hasAtomicOperators,
Callback,
MongoDBNamespace,
maxWireVersion,
getTopology,
resolveOptions
} from '../utils';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { UpdateOperation, UpdateStatement } from '../operations/update';
import { DeleteOperation, DeleteStatement } from '../operations/delete';
import { UpdateOperation, UpdateStatement, makeUpdateStatement } from '../operations/update';
import { DeleteOperation, DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { WriteConcern } from '../write_concern';
import type { Collection } from '../collection';
import type { Topology } from '../sdam/topology';
Expand Down Expand Up @@ -727,97 +726,66 @@ export class FindOperators {

/** Add a multiple update operation to the bulk operation */
update(updateDocument: Document): BulkOperationBase {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}

// Perform upsert
const upsert =
typeof this.bulkOperation.s.currentOp.upsert === 'boolean'
? this.bulkOperation.s.currentOp.upsert
: false;

// Establish the update command
const document: Document = {
q: this.bulkOperation.s.currentOp.selector,
u: updateDocument,
multi: true,
upsert: upsert
};

if (updateDocument.hint) {
document.hint = updateDocument.hint;
}

// Clear out current Op
this.bulkOperation.s.currentOp = undefined;
return this.bulkOperation.addToOperationsList(BatchType.UPDATE, document);
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.UPDATE,
makeUpdateStatement(currentOp.selector, updateDocument, {
...currentOp,
multi: true
})
);
}

/** Add a single update operation to the bulk operation */
updateOne(updateDocument: Document): BulkOperationBase {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}

// Perform upsert
const upsert =
typeof this.bulkOperation.s.currentOp.upsert === 'boolean'
? this.bulkOperation.s.currentOp.upsert
: false;

// Establish the update command
const document: Document = {
q: this.bulkOperation.s.currentOp.selector,
u: updateDocument,
multi: false,
upsert: upsert
};

if (updateDocument.hint) {
document.hint = updateDocument.hint;
}

if (!hasAtomicOperators(updateDocument)) {
throw new TypeError('Update document requires atomic operators');
}

// Clear out current Op
this.bulkOperation.s.currentOp = undefined;
return this.bulkOperation.addToOperationsList(BatchType.UPDATE, document);
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.UPDATE,
makeUpdateStatement(currentOp.selector, updateDocument, { ...currentOp, multi: false })
);
}

/** Add a replace one operation to the bulk operation */
replaceOne(replacement: Document): BulkOperationBase {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
if (hasAtomicOperators(replacement)) {
throw new TypeError('Replacement document must not use atomic operators');
}

// Perform upsert
const upsert =
typeof this.bulkOperation.s.currentOp.upsert === 'boolean'
? this.bulkOperation.s.currentOp.upsert
: false;

// Establish the update command
const document: Document = {
q: this.bulkOperation.s.currentOp.selector,
u: replacement,
multi: false,
upsert: upsert
};
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.UPDATE,
makeUpdateStatement(currentOp.selector, replacement, { ...currentOp, multi: false })
);
}

if (replacement.hint) {
document.hint = replacement.hint;
}
/** Add a delete one operation to the bulk operation */
deleteOne(): BulkOperationBase {
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 1 })
);
}

if (hasAtomicOperators(replacement)) {
throw new TypeError('Replacement document must not use atomic operators');
}
/** Add a delete many operation to the bulk operation */
delete(): BulkOperationBase {
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 0 })
);
}

removeOne(): BulkOperationBase {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind me, is there a ticket to remove these duplicated methods for v4?

@emadum emadum Dec 18, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to find one, it's not in the main ticket tracking deprecation (NODE-2317). Do you think we should get rid of them?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think now's the time to do it!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added NODE-2978 so as to not scope creep more on this PR 👍

return this.deleteOne();
}

// Clear out current Op
this.bulkOperation.s.currentOp = undefined;
return this.bulkOperation.addToOperationsList(BatchType.UPDATE, document);
remove(): BulkOperationBase {
return this.delete();
}

/** Upsert modifier for update bulk operation, noting that this operation is an upsert. */
Expand All @@ -830,46 +798,14 @@ export class FindOperators {
return this;
}

/** Add a delete one operation to the bulk operation */
deleteOne(): BulkOperationBase {
/** Specifies the collation for the query condition. */
collation(collation: CollationOptions): this {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}

// Establish the update command
const document = {
q: this.bulkOperation.s.currentOp.selector,
limit: 1
};

// Clear out current Op
this.bulkOperation.s.currentOp = undefined;
return this.bulkOperation.addToOperationsList(BatchType.DELETE, document);
}

/** Add a delete many operation to the bulk operation */
delete(): BulkOperationBase {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}

// Establish the update command
const document = {
q: this.bulkOperation.s.currentOp.selector,
limit: 0
};

// Clear out current Op
this.bulkOperation.s.currentOp = undefined;
return this.bulkOperation.addToOperationsList(BatchType.DELETE, document);
}

removeOne(): BulkOperationBase {
return this.deleteOne();
}

remove(): BulkOperationBase {
return this.delete();
this.bulkOperation.s.currentOp.collation = collation;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should expand the scope of this PR, but I just wanted to comment on something so you can keep it in mind. We have "builders" for a number of things in the driver, specifically for bulk and cursors. I think it would be really great for readability/maintainability if you were doing them all the same way. The bulk builders need a lot of work, and are confusingly coupled, but I think you could use a [kBuiltOptions] approach here (or better yet, wrap [kBuiltOptions] in some well-typed methods like addToCurrentOperation that you can reuse as a toolkit across all "builders" in the codebase).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that it'd be nice to have a single approach to built options. There's already a new ticket for adding a builder method for arrayFilters (NODE-2751), so perhaps we should prioritize this work as part of NODE-2734.

return this;
}
}

Expand Down Expand Up @@ -1136,19 +1072,23 @@ export abstract class BulkOperationBase {

if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) {
if ('replaceOne' in op) {
const updateStatement = makeUpdateStatement(this.s.topology, op.replaceOne, false);
const updateStatement = makeUpdateStatement(
op.replaceOne.filter,
op.replaceOne.replacement,
{ ...op.replaceOne, multi: false }
);
if (hasAtomicOperators(updateStatement.u)) {
throw new TypeError('Replacement document must not use atomic operators');
}

return this.addToOperationsList(
BatchType.UPDATE,
makeUpdateStatement(this.s.topology, op.replaceOne, false)
);
return this.addToOperationsList(BatchType.UPDATE, updateStatement);
}

if ('updateOne' in op) {
const updateStatement = makeUpdateStatement(this.s.topology, op.updateOne, false);
const updateStatement = makeUpdateStatement(op.updateOne.filter, op.updateOne.update, {
...op.updateOne,
multi: false
});
if (!hasAtomicOperators(updateStatement.u)) {
throw new TypeError('Update document requires atomic operators');
}
Expand All @@ -1157,7 +1097,10 @@ export abstract class BulkOperationBase {
}

if ('updateMany' in op) {
const updateStatement = makeUpdateStatement(this.s.topology, op.updateMany, true);
const updateStatement = makeUpdateStatement(op.updateMany.filter, op.updateMany.update, {
...op.updateMany,
multi: true
});
if (!hasAtomicOperators(updateStatement.u)) {
throw new TypeError('Update document requires atomic operators');
}
Expand All @@ -1169,28 +1112,28 @@ export abstract class BulkOperationBase {
if ('removeOne' in op) {
return this.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(this.s.topology, op.removeOne, false)
makeDeleteStatement(op.removeOne.filter, { ...op.removeOne, limit: 1 })
);
}

if ('removeMany' in op) {
return this.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(this.s.topology, op.removeMany, true)
makeDeleteStatement(op.removeMany.filter, { ...op.removeMany, limit: 0 })
);
}

if ('deleteOne' in op) {
return this.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(this.s.topology, op.deleteOne, false)
makeDeleteStatement(op.deleteOne.filter, { ...op.deleteOne, limit: 1 })
);
}

if ('deleteMany' in op) {
return this.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(this.s.topology, op.deleteMany, true)
makeDeleteStatement(op.deleteMany.filter, { ...op.deleteMany, limit: 0 })
);
}

Expand Down Expand Up @@ -1328,94 +1271,6 @@ function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
return false;
}

function makeUpdateStatement(
topology: Topology,
model: ReplaceOneModel | UpdateOneModel | UpdateManyModel,
multi: boolean
): UpdateStatement {
// NOTE: legacy support for a raw statement, consider removing
if (isUpdateStatement(model)) {
if ('collation' in model && maxWireVersion(topology) < 5) {
throw new TypeError('Topology does not support collation');
}

return model as UpdateStatement;
}

const statement: UpdateStatement = {
q: model.filter,
u: 'update' in model ? model.update : model.replacement,
multi,
upsert: 'upsert' in model ? model.upsert : false
};

if ('collation' in model) {
if (maxWireVersion(topology) < 5) {
throw new TypeError('Topology does not support collation');
}

statement.collation = model.collation;
}

if ('arrayFilters' in model) {
// TODO: this check should be done at command construction against a connection, not a topology
if (maxWireVersion(topology) < 6) {
throw new TypeError('arrayFilters are only supported on MongoDB 3.6+');
}

statement.arrayFilters = model.arrayFilters;
}

if ('hint' in model) {
statement.hint = model.hint;
}

return statement;
}

function isUpdateStatement(model: Document): model is UpdateStatement {
return 'q' in model;
}

function makeDeleteStatement(
topology: Topology,
model: DeleteOneModel | DeleteManyModel,
multi: boolean
): DeleteStatement {
// NOTE: legacy support for a raw statement, consider removing
if (isDeleteStatement(model)) {
if ('collation' in model && maxWireVersion(topology) < 5) {
throw new TypeError('Topology does not support collation');
}

model.limit = multi ? 0 : 1;
return model as DeleteStatement;
}

const statement: DeleteStatement = {
q: model.filter,
limit: multi ? 0 : 1
};

if ('collation' in model) {
if (maxWireVersion(topology) < 5) {
throw new TypeError('Topology does not support collation');
}

statement.collation = model.collation;
}

if ('hint' in model) {
statement.hint = model.hint;
}

return statement;
}

function isDeleteStatement(model: Document): model is DeleteStatement {
return 'q' in model;
}

function isInsertBatch(batch: Batch): boolean {
return batch.batchType === BatchType.INSERT;
}
Expand All @@ -1427,3 +1282,10 @@ function isUpdateBatch(batch: Batch): batch is Batch<UpdateStatement> {
function isDeleteBatch(batch: Batch): batch is Batch<DeleteStatement> {
return batch.batchType === BatchType.DELETE;
}

function buildCurrentOp(bulkOp: BulkOperationBase): Document {
let { currentOp } = bulkOp.s;
bulkOp.s.currentOp = undefined;
if (!currentOp) currentOp = {};
return currentOp;
}
2 changes: 2 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ export class Collection {
* ```js
* { insertOne: { document: { a: 1 } } }
*
* { insertMany: [{ g: 1 }, { g: 2 }]}
*
* { updateOne: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
*
* { updateMany: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
Expand Down
Loading