Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connection): add support for Connection.prototype.bulkWrite() with MongoDB server 8.0 #15058

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@ const ChangeStream = require('./cursor/changeStream');
const EventEmitter = require('events').EventEmitter;
const Schema = require('./schema');
const STATES = require('./connectionState');
const MongooseBulkWriteError = require('./error/bulkWriteError');
const MongooseError = require('./error/index');
const ServerSelectionError = require('./error/serverSelection');
const SyncIndexesError = require('./error/syncIndexes');
const applyPlugins = require('./helpers/schema/applyPlugins');
const clone = require('./helpers/clone');
const driver = require('./driver');
const get = require('./helpers/get');
const getDefaultBulkwriteResult = require('./helpers/getDefaultBulkwriteResult');
const immediate = require('./helpers/immediate');
const utils = require('./utils');
const CreateCollectionsError = require('./error/createCollectionsError');
const castBulkWrite = require('./helpers/model/castBulkWrite');
const { modelSymbol } = require('./helpers/symbols');
const isPromise = require('./helpers/isPromise');

const arrayAtomicsSymbol = require('./helpers/symbols').arrayAtomicsSymbol;
const sessionNewDocuments = require('./helpers/symbols').sessionNewDocuments;
Expand Down Expand Up @@ -416,6 +421,178 @@ Connection.prototype.createCollection = async function createCollection(collecti
return this.db.createCollection(collection, options);
};

/**
* _Requires MongoDB Server 8.0 or greater_. Executes bulk write operations across multiple models in a single operation.
* You must specify the `model` for each operation: Mongoose will use `model` for casting and validation, as well as
* determining which collection to apply the operation to.
*
* #### Example:
* const Test = mongoose.model('Test', new Schema({ name: String }));
*
* await db.bulkWrite([
* { model: Test, name: 'insertOne', document: { name: 'test1' } }, // Can specify model as a Model class...
* { model: 'Test', name: 'insertOne', document: { name: 'test2' } } // or as a model name
* ], { ordered: false });
*
* @method bulkWrite
* @param {Array} ops
* @param {Object} [options]
* @param {Boolean} [options.ordered] If false, perform unordered operations. If true, perform ordered operations.
* @param {Session} [options.session] The session to use for the operation.
* @return {Promise}
* @see MongoDB https://www.mongodb.com/docs/manual/reference/command/bulkWrite/#mongodb-dbcommand-dbcmd.bulkWrite
* @api public
*/


Connection.prototype.bulkWrite = async function bulkWrite(ops, options) {
await this._waitForConnect();
options = options || {};

const ordered = options.ordered == null ? true : options.ordered;
const asyncLocalStorage = this.base.transactionAsyncLocalStorage?.getStore();
if ((!options || !options.hasOwnProperty('session')) && asyncLocalStorage?.session != null) {
options = { ...options, session: asyncLocalStorage.session };
}

const now = this.base.now();

let res = null;
if (ordered) {
const opsToSend = [];
for (const op of ops) {
if (typeof op.model !== 'string' && !op.model?.[modelSymbol]) {
throw new MongooseError('Must specify model in Connection.prototype.bulkWrite() operations');
}
const Model = op.model[modelSymbol] ? op.model : this.model(op.model);

if (op.name == null) {
throw new MongooseError('Must specify operation name in Connection.prototype.bulkWrite()');
}
if (!castBulkWrite.cast.hasOwnProperty(op.name)) {
throw new MongooseError(`Unrecognized bulkWrite() operation name ${op.name}`);
}

await castBulkWrite.cast[op.name](Model, op, options, now);
opsToSend.push({ ...op, namespace: Model.namespace() });
}

res = await this.client.bulkWrite(opsToSend, options);
} else {
const validOps = [];
const validOpIndexes = [];
let validationErrors = [];
const asyncValidations = [];
const results = [];
for (let i = 0; i < ops.length; ++i) {
const op = ops[i];
if (typeof op.model !== 'string' && !op.model?.[modelSymbol]) {
const error = new MongooseError('Must specify model in Connection.prototype.bulkWrite() operations');
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}
let Model;
try {
Model = op.model[modelSymbol] ? op.model : this.model(op.model);
} catch (error) {
validationErrors.push({ index: i, error: error });
continue;
}
if (op.name == null) {
const error = new MongooseError('Must specify operation name in Connection.prototype.bulkWrite()');
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}
if (!castBulkWrite.cast.hasOwnProperty(op.name)) {
const error = new MongooseError(`Unrecognized bulkWrite() operation name ${op.name}`);
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}

let maybePromise = null;
try {
maybePromise = castBulkWrite.cast[op.name](Model, op, options, now);
} catch (error) {
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}
if (isPromise(maybePromise)) {
asyncValidations.push(
maybePromise.then(
() => {
validOps.push({ ...op, namespace: Model.namespace() });
validOpIndexes.push(i);
},
error => {
validationErrors.push({ index: i, error: error });
results[i] = error;
}
)
);
} else {
validOps.push({ ...op, namespace: Model.namespace() });
validOpIndexes.push(i);
}
}

if (asyncValidations.length > 0) {
await Promise.all(asyncValidations);
}

validationErrors = validationErrors.
sort((v1, v2) => v1.index - v2.index).
map(v => v.error);

if (validOps.length === 0) {
if (options.throwOnValidationError && validationErrors.length) {
throw new MongooseBulkWriteError(
validationErrors,
results,
res,
'bulkWrite'
);
}
return getDefaultBulkwriteResult();
}

let error;
[res, error] = await this.client.bulkWrite(validOps, options).
then(res => ([res, null])).
catch(err => ([null, err]));

if (error) {
if (validationErrors.length > 0) {
error.mongoose = error.mongoose || {};
error.mongoose.validationErrors = validationErrors;
}
}

for (let i = 0; i < validOpIndexes.length; ++i) {
results[validOpIndexes[i]] = null;
}
if (validationErrors.length > 0) {
if (options.throwOnValidationError) {
throw new MongooseBulkWriteError(
validationErrors,
results,
res,
'bulkWrite'
);
} else {
res.mongoose = res.mongoose || {};
res.mongoose.validationErrors = validationErrors;
res.mongoose.results = results;
}
}
}

return res;
};

/**
* Calls `createCollection()` on a models in a series.
*
Expand Down
1 change: 0 additions & 1 deletion lib/drivers/node-mongodb-native/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ function _setClient(conn, client, options, dbName) {
}
}


/*!
* Module exports.
*/
Expand Down
Loading