Skip to content

Commit

Permalink
perf(NODE-5906): optimize toArray to use batches (#4171)
Browse files Browse the repository at this point in the history
Co-authored-by: Neal Beeken <[email protected]>
  • Loading branch information
aditi-khare-mongoDB and nbbeeken authored Aug 7, 2024
1 parent b3f3987 commit 5565d50
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 5 deletions.
20 changes: 15 additions & 5 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ export abstract class AbstractCursor<
}

/** Returns current buffered documents */
readBufferedDocuments(number?: number): TSchema[] {
const bufferedDocs: TSchema[] = [];
readBufferedDocuments(number?: number): NonNullable<TSchema>[] {
const bufferedDocs: NonNullable<TSchema>[] = [];
const documentsToRead = Math.min(
number ?? this.documents?.length ?? 0,
this.documents?.length ?? 0
Expand All @@ -312,6 +312,7 @@ export abstract class AbstractCursor<

return bufferedDocs;
}

async *[Symbol.asyncIterator](): AsyncGenerator<TSchema, void, void> {
if (this.isClosed) {
return;
Expand Down Expand Up @@ -475,13 +476,22 @@ export abstract class AbstractCursor<
* cursor.rewind() can be used to reset the cursor.
*/
async toArray(): Promise<TSchema[]> {
const array = [];
const array: TSchema[] = [];
// at the end of the loop (since readBufferedDocuments is called) the buffer will be empty
// then, the 'await of' syntax will run a getMore call
for await (const document of this) {
array.push(document);
const docs = this.readBufferedDocuments();
if (this.transform != null) {
for (const doc of docs) {
array.push(await this.transformDocument(doc));
}
} else {
array.push(...docs);
}
}
return array;
}

/**
* Add a cursor flag to the cursor
*
Expand Down Expand Up @@ -822,7 +832,7 @@ export abstract class AbstractCursor<
}

/** @internal */
private async transformDocument(document: NonNullable<TSchema>): Promise<TSchema> {
private async transformDocument(document: NonNullable<TSchema>): Promise<NonNullable<TSchema>> {
if (this.transform == null) return document;

try {
Expand Down
61 changes: 61 additions & 0 deletions test/benchmarks/mongoBench/suites/multiBench.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,67 @@ function makeMultiBench(suite) {
})
.teardown(dropDb)
.teardown(disconnectClient)
)
.benchmark('aggregateAMillionDocumentsAndToArray', benchmark =>
benchmark
.taskSize(16)
.setup(makeClient)
.setup(connectClient)
.setup(initDb)
.setup(dropDb)
.task(async function () {
await this.db
.aggregate([
{ $documents: [{}] },
{
$set: {
field: {
$reduce: {
input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
initialValue: [0],
in: { $concatArrays: ['$$value', '$$value'] }
}
}
}
},
{ $unwind: '$field' },
{ $limit: 1000000 }
])
.toArray();
})
.teardown(dropDb)
.teardown(disconnectClient)
)
.benchmark('aggregateAMillionTweetsAndToArray', benchmark =>
benchmark
.taskSize(1500)
.setup(makeLoadJSON('tweet.json'))
.setup(makeClient)
.setup(connectClient)
.setup(initDb)
.setup(dropDb)
.task(async function () {
await this.db
.aggregate([
{ $documents: [this.doc] },
{
$set: {
id: {
$reduce: {
input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
initialValue: [0],
in: { $concatArrays: ['$$value', '$$value'] }
}
}
}
},
{ $unwind: '$id' },
{ $limit: 1000000 }
])
.toArray();
})
.teardown(dropDb)
.teardown(disconnectClient)
);
}

Expand Down
34 changes: 34 additions & 0 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Transform } from 'stream';
import { inspect } from 'util';

import {
AbstractCursor,
type Collection,
type FindCursor,
MongoAPIError,
Expand Down Expand Up @@ -361,4 +362,37 @@ describe('class AbstractCursor', function () {
});
});
});

describe('toArray', () => {
let nextSpy;
let client: MongoClient;
let cursor: AbstractCursor;
let col: Collection;
const numBatches = 10;
const batchSize = 4;

beforeEach(async function () {
client = this.configuration.newClient();
col = client.db().collection('test');
await col.deleteMany({});
for (let i = 0; i < numBatches; i++) {
await col.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]);
}
nextSpy = sinon.spy(AbstractCursor.prototype, 'next');
});

afterEach(async function () {
sinon.restore();
await cursor.close();
await client.close();
});

it('iterates per batch not per document', async () => {
cursor = client.db().collection('test').find({}, { batchSize });
await cursor.toArray();
expect(nextSpy.callCount).to.equal(numBatches + 1);
const numDocuments = numBatches * batchSize;
expect(nextSpy.callCount).to.be.lessThan(numDocuments);
});
});
});

0 comments on commit 5565d50

Please sign in to comment.