Skip to content

Commit

Permalink
fix(NODE-5628): bulkWriteResult.insertedIds does not filter out _ids …
Browse files Browse the repository at this point in the history
…that are not actually inserted (#3867)

Co-authored-by: Durran Jordan <[email protected]>
  • Loading branch information
aditi-khare-mongoDB and durran authored Sep 27, 2023
1 parent 3932260 commit 09f2a67
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 8 deletions.
45 changes: 38 additions & 7 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,17 @@ export class BulkWriteResult {
* Create a new BulkWriteResult instance
* @internal
*/
constructor(bulkResult: BulkResult) {
constructor(bulkResult: BulkResult, isOrdered: boolean) {
this.result = bulkResult;
this.insertedCount = this.result.nInserted ?? 0;
this.matchedCount = this.result.nMatched ?? 0;
this.modifiedCount = this.result.nModified ?? 0;
this.deletedCount = this.result.nRemoved ?? 0;
this.upsertedCount = this.result.upserted.length ?? 0;
this.upsertedIds = BulkWriteResult.generateIdMap(this.result.upserted);
this.insertedIds = BulkWriteResult.generateIdMap(this.result.insertedIds);
this.insertedIds = BulkWriteResult.generateIdMap(
this.getSuccessfullyInsertedIds(bulkResult, isOrdered)
);
Object.defineProperty(this, 'result', { value: this.result, enumerable: false });
}

Expand All @@ -214,6 +216,22 @@ export class BulkWriteResult {
return this.result.ok;
}

/**
* Returns document_ids that were actually inserted
* @internal
*/
private getSuccessfullyInsertedIds(bulkResult: BulkResult, isOrdered: boolean): Document[] {
if (bulkResult.writeErrors.length === 0) return bulkResult.insertedIds;

if (isOrdered) {
return bulkResult.insertedIds.slice(0, bulkResult.writeErrors[0].index);
}

return bulkResult.insertedIds.filter(
({ index }) => !bulkResult.writeErrors.some(writeError => index === writeError.index)
);
}

/** Returns the upserted id at the given index */
getUpsertedIdAt(index: number): Document | undefined {
return this.result.upserted[index];
Expand Down Expand Up @@ -479,7 +497,10 @@ function executeCommands(
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}

const batch = bulkOperation.s.batches.shift() as Batch;
Expand All @@ -488,17 +509,26 @@ function executeCommands(
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
Expand Down Expand Up @@ -572,6 +602,7 @@ function executeCommands(
function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
Expand All @@ -583,7 +614,7 @@ function handleMongoWriteConcernError(
message: err.result?.writeConcernError.errmsg,
code: err.result?.writeConcernError.result
},
new BulkWriteResult(bulkResult)
new BulkWriteResult(bulkResult, isOrdered)
)
);
}
Expand Down
118 changes: 117 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Collection,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
type MongoClient,
MongoDriverError,
MongoInvalidArgumentError
Expand All @@ -31,7 +32,6 @@ describe('Bulk', function () {
.createCollection('test')
.catch(() => null); // make ns exist
});

afterEach(async function () {
const cleanup = this.configuration.newClient();
await cleanup
Expand Down Expand Up @@ -91,6 +91,7 @@ describe('Bulk', function () {
}
});
});

context('when passed a valid document list', function () {
it('insertMany should not throw a MongoInvalidArgument error when called with a valid operation', async function () {
try {
Expand All @@ -104,6 +105,121 @@ describe('Bulk', function () {
}
});
});

context('when inserting duplicate values', function () {
let col;

beforeEach(async function () {
const db = client.db();
col = db.collection('test');
await col.createIndex([{ a: 1 }], { unique: true, sparse: false });
});

async function assertFailsWithDuplicateFields(input, isOrdered, expectedInsertedIds) {
const error = await col.insertMany(input, { ordered: isOrdered }).catch(error => error);
expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.result.insertedCount).to.equal(Object.keys(error.result.insertedIds).length);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}

context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 }
],
true,
{ 0: 0 }
);
});

it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
true,
{ 0: 0 }
);
});
});

context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
false,
{ 0: 0, 3: 3 }
);
});
});
});
});

describe('#bulkWrite()', function () {
context('when inserting duplicate values', function () {
let col;

beforeEach(async function () {
const db = client.db();
col = db.collection('test');
await col.createIndex([{ a: 1 }], { unique: true, sparse: false });
});

async function assertFailsWithDuplicateFields(input, isOrdered, expectedInsertedIds) {
const error = await col.bulkWrite(input, { ordered: isOrdered }).catch(error => error);
expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.result.insertedCount).to.equal(Object.keys(error.result.insertedIds).length);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}

context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
await assertFailsWithDuplicateFields(
[{ insertOne: { _id: 0, a: 1 } }, { insertOne: { _id: 1, a: 1 } }],
true,
{ 0: 0 }
);
});

it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
true,
{ 0: 0 }
);
});
});

context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
false,
{ 0: 0, 3: 3 }
);
});
});
});
});
});

Expand Down

0 comments on commit 09f2a67

Please sign in to comment.