diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index b29c1e07605..4c12f9faa81 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -317,10 +317,14 @@ export abstract class AbstractCursor< }; } + cursorStream(): Readable { + return new ReadableCursorStream(this); + } + stream(options?: CursorStreamOptions): Readable & AsyncIterable { if (options?.transform) { const transform = options.transform; - const readable = new ReadableCursorStream(this); + const readable = this.cursorStream(); return readable.pipe( new Transform({ @@ -338,7 +342,7 @@ export abstract class AbstractCursor< ); } - return new ReadableCursorStream(this); + return this.cursorStream(); } hasNext(): Promise; @@ -857,9 +861,10 @@ export function assertUninitialized(cursor: AbstractCursor): void { } } -class ReadableCursorStream extends Readable { - private _cursor: AbstractCursor; - private _readInProgress = false; +/** @internal */ +export class ReadableCursorStream extends Readable { + _cursor: AbstractCursor; + _readInProgress = false; constructor(cursor: AbstractCursor) { super({ @@ -882,7 +887,7 @@ class ReadableCursorStream extends Readable { this._cursor.close(err => process.nextTick(callback, err || error)); } - private _readNext() { + _readNext() { next(this._cursor, true, (err, result) => { if (err) { // NOTE: This is questionable, but we have a test backing the behavior. It seems the @@ -903,6 +908,14 @@ class ReadableCursorStream extends Readable { return this.push(null); } + // NOTE: The two above checks on the message of the error will cause a null to be pushed + // to the stream, thus closing the stream before the destroy call happens. This means + // that either of those error messages on a change stream will not get a proper + // 'error' event to be emitted (the error passed to destroy). Change stream resumability + // relies on that error event to be emitted to create its new cursor and thus was not + // working on 4.4 servers because the error emitted on failover was "interrupted at + // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down". + // See NODE-4475 for why change stream cursors have their own readable streams now. return this.destroy(err); } diff --git a/src/cursor/change_stream_cursor.ts b/src/cursor/change_stream_cursor.ts index b5a4ae46307..099877399c8 100644 --- a/src/cursor/change_stream_cursor.ts +++ b/src/cursor/change_stream_cursor.ts @@ -1,3 +1,5 @@ +import type { Readable } from 'stream'; + import type { Document, Long, Timestamp } from '../bson'; import { type ChangeStreamDocument, @@ -14,7 +16,12 @@ import type { CollationOptions } from '../operations/command'; import { type ExecutionResult, executeOperation } from '../operations/execute_operation'; import type { ClientSession } from '../sessions'; import { type Callback, type MongoDBNamespace, maxWireVersion } from '../utils'; -import { type AbstractCursorOptions, AbstractCursor } from './abstract_cursor'; +import { + type AbstractCursorOptions, + AbstractCursor, + next, + ReadableCursorStream +} from './abstract_cursor'; /** @internal */ export interface ChangeStreamCursorOptions extends AbstractCursorOptions { @@ -111,6 +118,10 @@ export class ChangeStreamCursor< return options; } + override cursorStream(): Readable { + return new ChangeStreamCursorStream(this); + } + cacheResumeToken(resumeToken: ResumeToken): void { if (this.bufferedCount() === 0 && this.postBatchResumeToken) { this.resumeToken = this.postBatchResumeToken; @@ -192,3 +203,26 @@ export class ChangeStreamCursor< }); } } + +/** @internal */ +export class ChangeStreamCursorStream extends ReadableCursorStream { + override _readNext() { + next(this._cursor, true, (err, result) => { + if (err) { + return this.destroy(err); + } + + if (result == null) { + this.push(null); + } else if (this.destroyed) { + this._cursor.close().catch(() => null); + } else { + if (this.push(result)) { + return this._readNext(); + } + + this._readInProgress = false; + } + }); + } +}