diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index cac7e8f493..da08f1a1a6 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -209,6 +209,13 @@ export abstract class AbstractCursor< } } + /** + * The cursor has no id until it receives a response from the initial cursor creating command. + * + * It is non-zero for as long as the database has an open cursor. + * + * The initiating command may receive a zero id if the entire result is in the `firstBatch`. + */ get id(): Long | undefined { return this.cursorId ?? undefined; } @@ -249,10 +256,17 @@ export abstract class AbstractCursor< this.cursorSession = clientSession; } + /** + * The cursor is closed and all remaining locally buffered documents have been iterated. + */ get closed(): boolean { - return this.isClosed; + return this.isClosed && (this.documents?.length ?? 0) === 0; } + /** + * A `killCursors` command was attempted on this cursor. + * This is performed if the cursor id is non zero. + */ get killed(): boolean { return this.isKilled; } @@ -294,7 +308,7 @@ export abstract class AbstractCursor< return; } - if (this.closed && (this.documents?.length ?? 0) === 0) { + if (this.closed) { return; } @@ -752,9 +766,11 @@ export abstract class AbstractCursor< !session.hasEnded ) { this.isKilled = true; + const cursorId = this.cursorId; + this.cursorId = Long.ZERO; await executeOperation( this.cursorClient, - new KillCursorsOperation(this.cursorId, this.cursorNamespace, this.selectedServer, { + new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, { session }) ); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 9268288782..3fa1e792ed 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1060,7 +1060,8 @@ describe('Change Streams', function () { await changeStreamIterator.next(); await changeStreamIterator.return(); expect(changeStream.closed).to.be.true; - expect(changeStream.cursor).property('closed', true); + expect(changeStream.cursor).property('isClosed', true); + expect(changeStream.cursor).nested.property('session.hasEnded', true); } ); diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 65d9342676..bee2333db9 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -9,6 +9,7 @@ import { type FindCursor, MongoAPIError, type MongoClient, + MongoCursorExhaustedError, MongoServerError } from '../../mongodb'; @@ -193,7 +194,9 @@ describe('class AbstractCursor', function () { const error = await cursor.toArray().catch(e => e); expect(error).be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + expect(cursor.id.isZero()).to.be.true; + // The first batch exhausted the cursor, the only thing to clean up is the session + expect(cursor.session.hasEnded).to.be.true; }); }); @@ -225,7 +228,9 @@ describe('class AbstractCursor', function () { } } catch (error) { expect(error).to.be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + expect(cursor.id.isZero()).to.be.true; + // The first batch exhausted the cursor, the only thing to clean up is the session + expect(cursor.session.hasEnded).to.be.true; } }); }); @@ -259,7 +264,9 @@ describe('class AbstractCursor', function () { const error = await cursor.forEach(iterator).catch(e => e); expect(error).to.be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + expect(cursor.id.isZero()).to.be.true; + // The first batch exhausted the cursor, the only thing to clean up is the session + expect(cursor.session.hasEnded).to.be.true; }); }); }); @@ -299,4 +306,59 @@ describe('class AbstractCursor', function () { expect(error).to.be.instanceof(MongoServerError); }); }); + + describe('cursor end state', function () { + let client: MongoClient; + let cursor: FindCursor; + + beforeEach(async function () { + client = this.configuration.newClient(); + const test = client.db().collection('test'); + await test.deleteMany({}); + await test.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]); + }); + + afterEach(async function () { + await cursor.close(); + await client.close(); + }); + + describe('when the last batch has been received', () => { + it('has a zero id and is not closed and is never killed', async function () { + cursor = client.db().collection('test').find({}); + expect(cursor).to.have.property('closed', false); + await cursor.tryNext(); + expect(cursor.id.isZero()).to.be.true; + expect(cursor).to.have.property('closed', false); + expect(cursor).to.have.property('killed', false); + }); + }); + + describe('when the last document has been iterated', () => { + it('has a zero id and is closed and is never killed', async function () { + cursor = client.db().collection('test').find({}); + await cursor.next(); + await cursor.next(); + await cursor.next(); + await cursor.next(); + expect(await cursor.next()).to.be.null; + expect(cursor.id.isZero()).to.be.true; + expect(cursor).to.have.property('closed', true); + expect(cursor).to.have.property('killed', false); + }); + }); + + describe('when some documents have been iterated and the cursor is closed', () => { + it('has a zero id and is not closed and is killed', async function () { + cursor = client.db().collection('test').find({}, { batchSize: 2 }); + await cursor.next(); + await cursor.close(); + expect(cursor).to.have.property('closed', false); + expect(cursor).to.have.property('killed', true); + expect(cursor.id.isZero()).to.be.true; + const error = await cursor.next().catch(error => error); + expect(error).to.be.instanceOf(MongoCursorExhaustedError); + }); + }); + }); }); diff --git a/test/integration/node-specific/cursor_async_iterator.test.js b/test/integration/node-specific/cursor_async_iterator.test.js index 78330ff09c..1777324abc 100644 --- a/test/integration/node-specific/cursor_async_iterator.test.js +++ b/test/integration/node-specific/cursor_async_iterator.test.js @@ -95,7 +95,7 @@ describe('Cursor Async Iterator Tests', function () { } expect(count).to.equal(1); - expect(cursor.closed).to.be.true; + expect(cursor.killed).to.be.true; }); it('cleans up cursor when breaking out of for await of loops', async function () { @@ -106,7 +106,8 @@ describe('Cursor Async Iterator Tests', function () { break; } - expect(cursor.closed).to.be.true; + // The expectation is that we have "cleaned" up the cursor on the server side + expect(cursor.killed).to.be.true; }); it('returns when attempting to reuse the cursor after a break', async function () { @@ -118,7 +119,7 @@ describe('Cursor Async Iterator Tests', function () { break; } - expect(cursor.closed).to.be.true; + expect(cursor.killed).to.be.true; for await (const doc of cursor) { expect.fail('Async generator returns immediately if cursor is closed', doc);