diff --git a/src/change_stream.ts b/src/change_stream.ts index 0772bf701ef..2781484fa3d 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -648,21 +648,21 @@ export class ChangeStream< hasNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const hasNext = await this.cursor.hasNext(); - return hasNext; - } catch (error) { + for (;;) { try { - await this._processErrorIteratorMode(error); const hasNext = await this.cursor.hasNext(); return hasNext; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); @@ -675,23 +675,22 @@ export class ChangeStream< next(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const change = await this.cursor.next(); - const processedChange = this._processChange(change ?? null); - return processedChange; - } catch (error) { + for (;;) { try { - await this._processErrorIteratorMode(error); const change = await this.cursor.next(); const processedChange = this._processChange(change ?? null); return processedChange; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); @@ -706,21 +705,21 @@ export class ChangeStream< tryNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const change = await this.cursor.tryNext(); - return change ?? null; - } catch (error) { + for (;;) { try { - await this._processErrorIteratorMode(error); const change = await this.cursor.tryNext(); return change ?? null; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a22e4bcf11d..3b31d499721 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1037,56 +1037,6 @@ describe('Change Streams', function () { }); describe('Change Stream Resume Error Tests', function () { - describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () { - let client: MongoClient; - let changeStream: ChangeStream; - - beforeEach(async function () { - client = this.configuration.newClient(); - await client.connect(); - }); - - afterEach(async function () { - await changeStream.close(); - await client.close(); - }); - - it('should support consecutive resumes', { - metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } }, - async test() { - const failCommand: FailPoint = { - configureFailPoint: 'failCommand', - mode: { - times: 2 - }, - data: { - failCommands: ['getMore'], - closeConnection: true - } - }; - - await client.db('admin').command(failCommand); - - const collection = client.db('test_consecutive_resume').collection('collection'); - - changeStream = collection.watch([], { batchSize: 1 }); - - await initIteratorMode(changeStream); - - await collection.insertOne({ name: 'bumpy' }); - await collection.insertOne({ name: 'bumpy' }); - await collection.insertOne({ name: 'bumpy' }); - - await sleep(1000); - - for (let i = 0; i < 3; ++i) { - const change = await changeStream.next(); - expect(change).not.to.be.null; - } - } - }); - }); - it.skip('should continue piping changes after a resumable error', { metadata: { requires: { topology: 'replicaset' } }, test: done => { diff --git a/test/integration/change-streams/change_streams.spec.test.ts b/test/integration/change-streams/change_streams.spec.test.ts index 48a56b99df5..451acc7a2dc 100644 --- a/test/integration/change-streams/change_streams.spec.test.ts +++ b/test/integration/change-streams/change_streams.spec.test.ts @@ -4,9 +4,5 @@ import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; describe('Change Streams Spec - Unified', function () { - runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test => - test.description === 'Test consecutive resume' - ? 'TODO(NODE-4670): fix consecutive resume change stream test' - : false - ); + runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified'))); }); diff --git a/test/integration/unified-test-format/unified_test_format.spec.test.ts b/test/integration/unified-test-format/unified_test_format.spec.test.ts index 9d1699cf541..0af0a0fe686 100644 --- a/test/integration/unified-test-format/unified_test_format.spec.test.ts +++ b/test/integration/unified-test-format/unified_test_format.spec.test.ts @@ -23,10 +23,6 @@ const filter: TestFilter = ({ description }) => { return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0'; } - if (description === 'Test consecutive resume') { - return 'TODO(NODE-4670): fix consecutive resume change stream test'; - } - if ( process.env.AUTH === 'auth' && [