Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5628): bulkWriteResult.insertedIds does not filter out _ids that are not actually inserted #3867

Merged
merged 13 commits into from
Sep 27, 2023
Merged
44 changes: 37 additions & 7 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,17 @@ export class BulkWriteResult {
* Create a new BulkWriteResult instance
* @internal
*/
constructor(bulkResult: BulkResult) {
constructor(bulkResult: BulkResult, isOrdered: boolean) {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
this.result = bulkResult;
this.insertedCount = this.result.nInserted ?? 0;
this.matchedCount = this.result.nMatched ?? 0;
this.modifiedCount = this.result.nModified ?? 0;
this.deletedCount = this.result.nRemoved ?? 0;
this.upsertedCount = this.result.upserted.length ?? 0;
this.upsertedIds = BulkWriteResult.generateIdMap(this.result.upserted);
this.insertedIds = BulkWriteResult.generateIdMap(this.result.insertedIds);
this.insertedIds = BulkWriteResult.generateIdMap(
this.getSuccessfullyInsertedIds(bulkResult, isOrdered)
);
Object.defineProperty(this, 'result', { value: this.result, enumerable: false });
}

Expand All @@ -214,6 +216,21 @@ export class BulkWriteResult {
return this.result.ok;
}

/** Returns document_ids that were actually inserted
* @internal
*/
getSuccessfullyInsertedIds(bulkResult: BulkResult, isOrdered: boolean): Document[] {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
if (bulkResult.writeErrors.length === 0) return bulkResult.insertedIds;

if (isOrdered) {
return bulkResult.insertedIds.slice(0, bulkResult.writeErrors[0].index);
}

return bulkResult.insertedIds.filter(
({ index }) => !bulkResult.writeErrors.some(writeError => index === writeError.index)
);
}

/** Returns the upserted id at the given index */
getUpsertedIdAt(index: number): Document | undefined {
return this.result.upserted[index];
Expand Down Expand Up @@ -479,7 +496,10 @@ function executeCommands(
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}

const batch = bulkOperation.s.batches.shift() as Batch;
Expand All @@ -488,17 +508,26 @@ function executeCommands(
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
Expand Down Expand Up @@ -572,6 +601,7 @@ function executeCommands(
function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
Expand All @@ -583,7 +613,7 @@ function handleMongoWriteConcernError(
message: err.result?.writeConcernError.errmsg,
code: err.result?.writeConcernError.result
},
new BulkWriteResult(bulkResult)
new BulkWriteResult(bulkResult, isOrdered)
)
);
}
Expand Down
131 changes: 130 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Collection,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
type MongoClient,
MongoDriverError,
MongoInvalidArgumentError
Expand All @@ -31,7 +32,6 @@ describe('Bulk', function () {
.createCollection('test')
.catch(() => null); // make ns exist
});

afterEach(async function () {
const cleanup = this.configuration.newClient();
await cleanup
Expand Down Expand Up @@ -104,6 +104,135 @@ describe('Bulk', function () {
}
});
});
context('when inserting duplicate values', function () {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
async function assertFailsWithDuplicateFields(
input,
isOrdered,
indices,
expectedInsertedIds
) {
try {
const db = client.db();
const col = db.collection('test');
for (let i = 0; i < indices.length; i++) {
await col.createIndex(indices[i], { unique: true, sparse: false });
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}
await col.insertMany(input, { ordered: isOrdered });
expect.fail('a MongoBulkWriteError must be thrown when write errors are encountered');
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
} catch (error) {
expect(error instanceof MongoBulkWriteError).to.equal(true);
durran marked this conversation as resolved.
Show resolved Hide resolved
expect(error.result.insertedCount).to.equal(
Object.keys(error.result.insertedIds).length
);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}
}
context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 }
],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
});
context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
false,
[{ a: 1 }],
{ 0: 0, 3: 3 }
);
});
});
});
});
describe('#bulkWrite()', function () {
context('when inserting duplicate values', function () {
async function assertFailsWithDuplicateFields(
input,
isOrdered,
indices,
expectedInsertedIds
) {
try {
const db = client.db();
const col = db.collection('test');
for (let i = 0; i < indices.length; i++) {
await col.createIndex(indices[i], { unique: true, sparse: false });
}
await col.bulkWrite(input, { ordered: isOrdered });
expect.fail('a MongoBulkWriteError must be thrown when write errors are encountered');
} catch (error) {
expect(error instanceof MongoBulkWriteError).to.equal(true);
durran marked this conversation as resolved.
Show resolved Hide resolved
expect(error.result.insertedCount).to.equal(
Object.keys(error.result.insertedIds).length
);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}
}
context('when the insert is ordered', function () {
it('contains the correct insertedIds on one duplicate insert', async function () {
await assertFailsWithDuplicateFields(
[{ insertOne: { _id: 0, a: 1 } }, { insertOne: { _id: 1, a: 1 } }],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
it('contains the correct insertedIds on multiple duplicate inserts', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
});
context('when the insert is unordered', function () {
it('contains the correct insertedIds on multiple duplicate insertsr', async function () {
await assertFailsWithDuplicateFields(
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
false,
[{ a: 1 }],
{ 0: 0, 3: 3 }
);
});
});
});
});
});

Expand Down