diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 1d69701916..de53ed1b2e 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -296,8 +296,8 @@ export abstract class AbstractCursor< } /** Returns current buffered documents */ - readBufferedDocuments(number?: number): TSchema[] { - const bufferedDocs: TSchema[] = []; + readBufferedDocuments(number?: number): NonNullable[] { + const bufferedDocs: NonNullable[] = []; const documentsToRead = Math.min( number ?? this.documents?.length ?? 0, this.documents?.length ?? 0 @@ -312,6 +312,7 @@ export abstract class AbstractCursor< return bufferedDocs; } + async *[Symbol.asyncIterator](): AsyncGenerator { if (this.isClosed) { return; @@ -475,13 +476,22 @@ export abstract class AbstractCursor< * cursor.rewind() can be used to reset the cursor. */ async toArray(): Promise { - const array = []; + const array: TSchema[] = []; + // at the end of the loop (since readBufferedDocuments is called) the buffer will be empty + // then, the 'await of' syntax will run a getMore call for await (const document of this) { array.push(document); + const docs = this.readBufferedDocuments(); + if (this.transform != null) { + for (const doc of docs) { + array.push(await this.transformDocument(doc)); + } + } else { + array.push(...docs); + } } return array; } - /** * Add a cursor flag to the cursor * @@ -822,7 +832,7 @@ export abstract class AbstractCursor< } /** @internal */ - private async transformDocument(document: NonNullable): Promise { + private async transformDocument(document: NonNullable): Promise> { if (this.transform == null) return document; try { diff --git a/test/benchmarks/mongoBench/suites/multiBench.js b/test/benchmarks/mongoBench/suites/multiBench.js index ae1f921f94..c6afab962c 100644 --- a/test/benchmarks/mongoBench/suites/multiBench.js +++ b/test/benchmarks/mongoBench/suites/multiBench.js @@ -155,6 +155,67 @@ function makeMultiBench(suite) { }) .teardown(dropDb) .teardown(disconnectClient) + ) + .benchmark('aggregateAMillionDocumentsAndToArray', benchmark => + benchmark + .taskSize(16) + .setup(makeClient) + .setup(connectClient) + .setup(initDb) + .setup(dropDb) + .task(async function () { + await this.db + .aggregate([ + { $documents: [{}] }, + { + $set: { + field: { + $reduce: { + input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], + initialValue: [0], + in: { $concatArrays: ['$$value', '$$value'] } + } + } + } + }, + { $unwind: '$field' }, + { $limit: 1000000 } + ]) + .toArray(); + }) + .teardown(dropDb) + .teardown(disconnectClient) + ) + .benchmark('aggregateAMillionTweetsAndToArray', benchmark => + benchmark + .taskSize(1500) + .setup(makeLoadJSON('tweet.json')) + .setup(makeClient) + .setup(connectClient) + .setup(initDb) + .setup(dropDb) + .task(async function () { + await this.db + .aggregate([ + { $documents: [this.doc] }, + { + $set: { + id: { + $reduce: { + input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], + initialValue: [0], + in: { $concatArrays: ['$$value', '$$value'] } + } + } + } + }, + { $unwind: '$id' }, + { $limit: 1000000 } + ]) + .toArray(); + }) + .teardown(dropDb) + .teardown(disconnectClient) ); } diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index bee2333db9..a5e7fba13d 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -5,6 +5,7 @@ import { Transform } from 'stream'; import { inspect } from 'util'; import { + AbstractCursor, type Collection, type FindCursor, MongoAPIError, @@ -361,4 +362,37 @@ describe('class AbstractCursor', function () { }); }); }); + + describe('toArray', () => { + let nextSpy; + let client: MongoClient; + let cursor: AbstractCursor; + let col: Collection; + const numBatches = 10; + const batchSize = 4; + + beforeEach(async function () { + client = this.configuration.newClient(); + col = client.db().collection('test'); + await col.deleteMany({}); + for (let i = 0; i < numBatches; i++) { + await col.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]); + } + nextSpy = sinon.spy(AbstractCursor.prototype, 'next'); + }); + + afterEach(async function () { + sinon.restore(); + await cursor.close(); + await client.close(); + }); + + it('iterates per batch not per document', async () => { + cursor = client.db().collection('test').find({}, { batchSize }); + await cursor.toArray(); + expect(nextSpy.callCount).to.equal(numBatches + 1); + const numDocuments = numBatches * batchSize; + expect(nextSpy.callCount).to.be.lessThan(numDocuments); + }); + }); });