Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,8 @@ export abstract class BulkOperationBase {
}

this.s.executed = true;
return executeLegacyOperation(this.s.topology, executeCommands, [this, options, callback]);
const finalOptions = { ...this.s.options, ...options };
return executeLegacyOperation(this.s.topology, executeCommands, [this, finalOptions, callback]);
}

/**
Expand Down
99 changes: 99 additions & 0 deletions test/functional/bulk.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2003,4 +2003,103 @@ describe('Bulk', function () {
);
});
});

describe('Bulk operation transaction rollback', () => {
/** @type {import('../../src/index').MongoClient} */
let client;
/** @type {import('../../src/index').Collection<{ answer: number }>} */
let collection;

beforeEach(async function () {
const config = this.configuration;
client = config.newClient();
await client.connect();

try {
await client
.db('bulk_operation_writes_test')
.collection('bulk_write_transaction_test')
.drop();
} catch (_) {
// do not care
}

collection = await client
.db('bulk_operation_writes_test')
.createCollection('bulk_write_transaction_test');

await collection.deleteMany({});
});

it('should abort ordered/unordered bulk operation writes', {
metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } },
async test() {
const session = client.startSession();
session.startTransaction({
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
});

let bulk = undefined;

// 1.
bulk = collection.initializeUnorderedBulkOp({ session });
bulk.insert({ answer: 42 });
await bulk.execute();

// 2.
bulk = collection.initializeOrderedBulkOp({ session });
bulk.insert({ answer: 43 });
await bulk.execute();

await session.abortTransaction();
await session.endSession();

const documents = await collection.find().toArray();

expect(documents).to.be.lengthOf(
0,
'bulk operation writes were made outside of transaction'
);

await client.close(); // session leak checker doesn't let you put the close in an after hook
}
});

it('should abort ordered/unordered bulk operation writes using withTransaction', {
metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } },
async test() {
const session = client.startSession();

await session.withTransaction(
async () => {
let bulk = undefined;
// 1.
bulk = collection.initializeUnorderedBulkOp({ session });
bulk.insert({ answer: 42 });
await bulk.execute();

// 2.
bulk = collection.initializeOrderedBulkOp({ session });
bulk.insert({ answer: 43 });
await bulk.execute();

await session.abortTransaction();
},
{ readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }
);

await session.endSession();

const documents = await collection.find().toArray();

expect(documents).to.be.lengthOf(
0,
'bulk operation writes were made outside of transaction'
);

await client.close(); // session leak checker doesn't let you put the close in an after hook
}
});
});
});