From 8785132a3adcd643921d3adfc080d0bc4baed936 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 9 Oct 2024 11:46:36 -0400 Subject: [PATCH] refactor(NODE-6398): bulkWrite internals to use async/await (#4252) --- src/bulk/common.ts | 200 ++++++++++--------------- src/bulk/unordered.ts | 7 +- test/integration/crud/crud_api.test.ts | 22 ++- 3 files changed, 93 insertions(+), 136 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index c133a57d22..a62d62a4a5 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -1,5 +1,3 @@ -import { promisify } from 'util'; - import { type BSONSerializeOptions, type Document, EJSON, resolveBSONOptions } from '../bson'; import type { Collection } from '../collection'; import { @@ -7,6 +5,7 @@ import { MongoBatchReExecutionError, MONGODB_ERROR_CODES, MongoInvalidArgumentError, + MongoRuntimeError, MongoServerError, MongoWriteConcernError } from '../error'; @@ -22,7 +21,6 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { applyRetryableWrites, - type Callback, getTopology, hasAtomicOperators, maybeAddIdToDocuments, @@ -500,86 +498,46 @@ export function mergeBatchResults( } } -function executeCommands( +async function executeCommands( bulkOperation: BulkOperationBase, - options: BulkWriteOptions, - callback: Callback -) { + options: BulkWriteOptions +): Promise { if (bulkOperation.s.batches.length === 0) { - return callback( - undefined, - new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) - ); + return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); } - const batch = bulkOperation.s.batches.shift() as Batch; + for (const batch of bulkOperation.s.batches) { + const finalOptions = resolveOptions(bulkOperation, { + ...options, + ordered: bulkOperation.isOrdered + }); - function resultHandler(err?: AnyError, result?: Document) { - // Error is a driver related error not a bulk op error, return early - if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) { - return callback( - new MongoBulkWriteError( - err, - new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) - ) - ); + if (finalOptions.bypassDocumentValidation !== true) { + delete finalOptions.bypassDocumentValidation; } - if (err instanceof MongoWriteConcernError) { - return handleMongoWriteConcernError( - batch, - bulkOperation.s.bulkResult, - bulkOperation.isOrdered, - err, - callback - ); + // Is the bypassDocumentValidation options specific + if (bulkOperation.s.bypassDocumentValidation === true) { + finalOptions.bypassDocumentValidation = true; } - // Merge the results together - mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result); - const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); - if (bulkOperation.handleWriteError(callback, writeResult)) return; - - // Execute the next command in line - executeCommands(bulkOperation, options, callback); - } - - const finalOptions = resolveOptions(bulkOperation, { - ...options, - ordered: bulkOperation.isOrdered - }); - - if (finalOptions.bypassDocumentValidation !== true) { - delete finalOptions.bypassDocumentValidation; - } - - // Set an operationIf if provided - if (bulkOperation.operationId) { - resultHandler.operationId = bulkOperation.operationId; - } - - // Is the bypassDocumentValidation options specific - if (bulkOperation.s.bypassDocumentValidation === true) { - finalOptions.bypassDocumentValidation = true; - } - - // Is the checkKeys option disabled - if (bulkOperation.s.checkKeys === false) { - finalOptions.checkKeys = false; - } - - if (finalOptions.retryWrites) { - if (isUpdateBatch(batch)) { - finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi); + // Is the checkKeys option disabled + if (bulkOperation.s.checkKeys === false) { + finalOptions.checkKeys = false; } - if (isDeleteBatch(batch)) { - finalOptions.retryWrites = - finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); + if (finalOptions.retryWrites) { + if (isUpdateBatch(batch)) { + finalOptions.retryWrites = + finalOptions.retryWrites && !batch.operations.some(op => op.multi); + } + + if (isDeleteBatch(batch)) { + finalOptions.retryWrites = + finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); + } } - } - try { const operation = isInsertBatch(batch) ? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions) : isUpdateBatch(batch) @@ -588,39 +546,50 @@ function executeCommands( ? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions) : null; - if (operation != null) { - executeOperation(bulkOperation.s.collection.client, operation).then( - result => resultHandler(undefined, result), - error => resultHandler(error) - ); + if (operation == null) throw new MongoRuntimeError(`Unknown batchType: ${batch.batchType}`); + + let thrownError = null; + let result; + try { + result = await executeOperation(bulkOperation.s.collection.client, operation); + } catch (error) { + thrownError = error; + } + + if (thrownError != null) { + if (thrownError instanceof MongoWriteConcernError) { + mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); + const writeResult = new BulkWriteResult( + bulkOperation.s.bulkResult, + bulkOperation.isOrdered + ); + + throw new MongoBulkWriteError( + { + message: thrownError.result.writeConcernError.errmsg, + code: thrownError.result.writeConcernError.code + }, + writeResult + ); + } else { + // Error is a driver related error not a bulk op error, return early + throw new MongoBulkWriteError( + thrownError, + new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) + ); + } } - } catch (err) { - // Force top level error - err.ok = 0; - // Merge top level error and return - mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined); - callback(); + + mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); + const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); + bulkOperation.handleWriteError(writeResult); } -} -function handleMongoWriteConcernError( - batch: Batch, - bulkResult: BulkResult, - isOrdered: boolean, - err: MongoWriteConcernError, - callback: Callback -) { - mergeBatchResults(batch, bulkResult, undefined, err.result); - - callback( - new MongoBulkWriteError( - { - message: err.result.writeConcernError.errmsg, - code: err.result.writeConcernError.code - }, - new BulkWriteResult(bulkResult, isOrdered) - ) - ); + bulkOperation.s.batches.length = 0; + + const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); + bulkOperation.handleWriteError(writeResult); + return writeResult; } /** @@ -875,8 +844,6 @@ export interface BulkWriteOptions extends CommandOperationOptions { let?: Document; } -const executeCommandsAsync = promisify(executeCommands); - /** * TODO(NODE-4063) * BulkWrites merge complexity is implemented in executeCommands @@ -895,7 +862,7 @@ export class BulkWriteShimOperation extends AbstractOperation { return 'bulkWrite' as const; } - execute(_server: Server, session: ClientSession | undefined): Promise { + async execute(_server: Server, session: ClientSession | undefined): Promise { if (this.options.session == null) { // An implicit session could have been created by 'executeOperation' // So if we stick it on finalOptions here, each bulk operation @@ -903,7 +870,7 @@ export class BulkWriteShimOperation extends AbstractOperation { // an explicit session would be this.options.session = session; } - return executeCommandsAsync(this.bulkOperation, this.options); + return await executeCommands(this.bulkOperation, this.options); } } @@ -1239,33 +1206,26 @@ export abstract class BulkOperationBase { * Handles the write error before executing commands * @internal */ - handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean { + handleWriteError(writeResult: BulkWriteResult): void { if (this.s.bulkResult.writeErrors.length > 0) { const msg = this.s.bulkResult.writeErrors[0].errmsg ? this.s.bulkResult.writeErrors[0].errmsg : 'write operation failed'; - callback( - new MongoBulkWriteError( - { - message: msg, - code: this.s.bulkResult.writeErrors[0].code, - writeErrors: this.s.bulkResult.writeErrors - }, - writeResult - ) + throw new MongoBulkWriteError( + { + message: msg, + code: this.s.bulkResult.writeErrors[0].code, + writeErrors: this.s.bulkResult.writeErrors + }, + writeResult ); - - return true; } const writeConcernError = writeResult.getWriteConcernError(); if (writeConcernError) { - callback(new MongoBulkWriteError(writeConcernError, writeResult)); - return true; + throw new MongoBulkWriteError(writeConcernError, writeResult); } - - return false; } abstract addToOperationsList( diff --git a/src/bulk/unordered.ts b/src/bulk/unordered.ts index 3b7e7f2f35..97a134613b 100644 --- a/src/bulk/unordered.ts +++ b/src/bulk/unordered.ts @@ -4,7 +4,6 @@ import type { Collection } from '../collection'; import { MongoInvalidArgumentError } from '../error'; import type { DeleteStatement } from '../operations/delete'; import type { UpdateStatement } from '../operations/update'; -import { type Callback } from '../utils'; import { Batch, BatchType, @@ -20,12 +19,12 @@ export class UnorderedBulkOperation extends BulkOperationBase { super(collection, options, false); } - override handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean { + override handleWriteError(writeResult: BulkWriteResult): void { if (this.s.batches.length) { - return false; + return; } - return super.handleWriteError(callback, writeResult); + return super.handleWriteError(writeResult); } addToOperationsList( diff --git a/test/integration/crud/crud_api.test.ts b/test/integration/crud/crud_api.test.ts index a391e1c448..94610462a2 100644 --- a/test/integration/crud/crud_api.test.ts +++ b/test/integration/crud/crud_api.test.ts @@ -7,8 +7,8 @@ import { Collection, CommandFailedEvent, CommandSucceededEvent, + MongoBulkWriteError, type MongoClient, - MongoError, MongoServerError, ObjectId, ReturnDocument @@ -1093,14 +1093,8 @@ describe('CRUD API', function () { } }); - it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - test: async function () { + describe('when performing a multi-batch unordered bulk write that has a duplicate key', function () { + it('throws a MongoBulkWriteError indicating the duplicate key document failed', async function () { const ops = []; // Create a set of operations that go over the 1000 limit causing two messages let i = 0; @@ -1108,7 +1102,7 @@ describe('CRUD API', function () { ops.push({ insertOne: { _id: i, a: i } }); } - ops.push({ insertOne: { _id: 0, a: i } }); + ops[500] = { insertOne: { _id: 0, a: i } }; const db = client.db(); @@ -1117,8 +1111,12 @@ describe('CRUD API', function () { .bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } }) .catch(error => error); - expect(error).to.be.instanceOf(MongoError); - } + expect(error).to.be.instanceOf(MongoBulkWriteError); + // 1004 because one of them is duplicate key + // but since it is unordered we continued to write + expect(error).to.have.property('insertedCount', 1004); + expect(error.writeErrors[0]).to.have.nested.property('err.index', 500); + }); }); it('should correctly throw error on illegal callback when ordered bulkWrite encounters error', {