diff --git a/global.d.ts b/global.d.ts index 356eb2e45d6..f386849cb90 100644 --- a/global.d.ts +++ b/global.d.ts @@ -44,6 +44,20 @@ declare global { (title: string, metadata: MongoDBMetadataUI, fn: (this: Suite) => void): Mocha.Suite; } + interface ExclusiveSuiteFunction { + (title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test; + (title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test; + (title: string, metadataAndTest: MetadataAndTest): Mocha.Test; + (title: string, metadataAndTest: MetadataAndTest): Mocha.Test; + } + + interface ExclusiveTestFunction { + (title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test; + (title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test; + (title: string, metadataAndTest: MetadataAndTest): Mocha.Test; + (title: string, metadataAndTest: MetadataAndTest): Mocha.Test; + } + interface TestFunction { (title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test; (title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test; diff --git a/src/change_stream.ts b/src/change_stream.ts index e268097bd63..e2056a49eb0 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -2,15 +2,11 @@ import Denque = require('denque'); import type { Readable } from 'stream'; import { setTimeout } from 'timers'; -import type { Binary, Document, Long, Timestamp } from './bson'; +import type { Binary, Document, Timestamp } from './bson'; import { Collection } from './collection'; import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants'; -import { - AbstractCursor, - AbstractCursorEvents, - AbstractCursorOptions, - CursorStreamOptions -} from './cursor/abstract_cursor'; +import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor'; +import { ChangeStreamCursor, ChangeStreamCursorOptions } from './cursor/change_stream_cursor'; import { Db } from './db'; import { AnyError, @@ -20,13 +16,12 @@ import { MongoRuntimeError } from './error'; import { MongoClient } from './mongo_client'; -import { InferIdType, TODO_NODE_3286, TypedEventEmitter } from './mongo_types'; -import { AggregateOperation, AggregateOptions } from './operations/aggregate'; +import { InferIdType, TypedEventEmitter } from './mongo_types'; +import type { AggregateOptions } from './operations/aggregate'; import type { CollationOptions, OperationParent } from './operations/command'; -import { executeOperation, ExecutionResult } from './operations/execute_operation'; import type { ReadPreference } from './read_preference'; import type { Topology } from './sdam/topology'; -import type { ClientSession, ServerSessionId } from './sessions'; +import type { ServerSessionId } from './sessions'; import { calculateDurationInMs, Callback, @@ -111,18 +106,6 @@ export interface PipeOptions { end?: boolean; } -/** @internal */ -export type ChangeStreamAggregateRawResult = { - $clusterTime: { clusterTime: Timestamp }; - cursor: { - postBatchResumeToken: ResumeToken; - ns: string; - id: number | Long; - } & ({ firstBatch: TChange[] } | { nextBatch: TChange[] }); - ok: 1; - operationTime: Timestamp; -}; - /** * Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified. * @public @@ -700,6 +683,21 @@ export class ChangeStream< }); } + /** + * Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned + */ + tryNext(): Promise; + tryNext(callback: Callback): void; + tryNext(callback?: Callback): Promise | void { + this._setIsIterator(); + return maybePromise(callback, cb => { + this._getCursor((err, cursor) => { + if (err || !cursor) return cb(err); // failed to resume, raise an error + return cursor.tryNext(cb); + }); + }); + } + /** Is the cursor closed */ get closed(): boolean { return this[kClosed] || (this.cursor?.closed ?? false); @@ -733,21 +731,6 @@ export class ChangeStream< return this.cursor.stream(options); } - /** - * Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned - */ - tryNext(): Promise; - tryNext(callback: Callback): void; - tryNext(callback?: Callback): Promise | void { - this._setIsIterator(); - return maybePromise(callback, cb => { - this._getCursor((err, cursor) => { - if (err || !cursor) return cb(err); // failed to resume, raise an error - return cursor.tryNext(cb); - }); - }); - } - /** @internal */ private _setIsEmitter(): void { if (this[kMode] === 'iterator') { @@ -923,15 +906,6 @@ export class ChangeStream< this._processResumeQueue(); }; - // otherwise, raise an error and close the change stream - const unresumableError = (err: AnyError) => { - if (!callback) { - this.emit(ChangeStream.ERROR, err); - } - - this.close(() => this._processResumeQueue(err)); - }; - if (cursor && isResumableError(error, maxWireVersion(cursor.server))) { this.cursor = undefined; @@ -944,7 +918,7 @@ export class ChangeStream< const topology = getTopology(this.parent); this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => { // if the topology can't reconnect, close the stream - if (err) return unresumableError(err); + if (err) return this._closeWithError(err, callback); // create a new cursor, preserving the old cursor's options const newCursor = this._createChangeStreamCursor(cursor.resumeOptions); @@ -955,7 +929,7 @@ export class ChangeStream< // attempt to continue in iterator mode newCursor.hasNext(err => { // if there's an error immediately after resuming, close the stream - if (err) return unresumableError(err); + if (err) return this._closeWithError(err); resumeWithCursor(newCursor); }); }); @@ -1010,158 +984,3 @@ export class ChangeStream< } } } - -/** @internal */ -export interface ChangeStreamCursorOptions extends AbstractCursorOptions { - startAtOperationTime?: OperationTime; - resumeAfter?: ResumeToken; - startAfter?: ResumeToken; - maxAwaitTimeMS?: number; - collation?: CollationOptions; - fullDocument?: string; -} - -/** @internal */ -export class ChangeStreamCursor< - TSchema extends Document = Document, - TChange extends Document = ChangeStreamDocument -> extends AbstractCursor { - _resumeToken: ResumeToken; - startAtOperationTime?: OperationTime; - hasReceived?: boolean; - resumeAfter: ResumeToken; - startAfter: ResumeToken; - options: ChangeStreamCursorOptions; - - postBatchResumeToken?: ResumeToken; - pipeline: Document[]; - - constructor( - client: MongoClient, - namespace: MongoDBNamespace, - pipeline: Document[] = [], - options: ChangeStreamCursorOptions = {} - ) { - super(client, namespace, options); - - this.pipeline = pipeline; - this.options = options; - this._resumeToken = null; - this.startAtOperationTime = options.startAtOperationTime; - - if (options.startAfter) { - this.resumeToken = options.startAfter; - } else if (options.resumeAfter) { - this.resumeToken = options.resumeAfter; - } - } - - set resumeToken(token: ResumeToken) { - this._resumeToken = token; - this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token); - } - - get resumeToken(): ResumeToken { - return this._resumeToken; - } - - get resumeOptions(): ChangeStreamCursorOptions { - const options: ChangeStreamCursorOptions = { - ...this.options - }; - - for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) { - delete options[key]; - } - - if (this.resumeToken != null) { - if (this.options.startAfter && !this.hasReceived) { - options.startAfter = this.resumeToken; - } else { - options.resumeAfter = this.resumeToken; - } - } else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) { - options.startAtOperationTime = this.startAtOperationTime; - } - - return options; - } - - cacheResumeToken(resumeToken: ResumeToken): void { - if (this.bufferedCount() === 0 && this.postBatchResumeToken) { - this.resumeToken = this.postBatchResumeToken; - } else { - this.resumeToken = resumeToken; - } - this.hasReceived = true; - } - - _processBatch(response: ChangeStreamAggregateRawResult): void { - const cursor = response.cursor; - if (cursor.postBatchResumeToken) { - this.postBatchResumeToken = response.cursor.postBatchResumeToken; - - const batch = - 'firstBatch' in response.cursor ? response.cursor.firstBatch : response.cursor.nextBatch; - if (batch.length === 0) { - this.resumeToken = cursor.postBatchResumeToken; - } - } - } - - clone(): AbstractCursor { - return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, { - ...this.cursorOptions - }); - } - - _initialize(session: ClientSession, callback: Callback): void { - const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, { - ...this.cursorOptions, - ...this.options, - session - }); - - executeOperation>( - session.client, - aggregateOperation, - (err, response) => { - if (err || response == null) { - return callback(err); - } - - const server = aggregateOperation.server; - if ( - this.startAtOperationTime == null && - this.resumeAfter == null && - this.startAfter == null && - maxWireVersion(server) >= 7 - ) { - this.startAtOperationTime = response.operationTime; - } - - this._processBatch(response); - - this.emit(ChangeStream.INIT, response); - this.emit(ChangeStream.RESPONSE); - - // TODO: NODE-2882 - callback(undefined, { server, session, response }); - } - ); - } - - override _getMore(batchSize: number, callback: Callback): void { - super._getMore(batchSize, (err, response) => { - if (err) { - return callback(err); - } - - this._processBatch(response as TODO_NODE_3286 as ChangeStreamAggregateRawResult); - - this.emit(ChangeStream.MORE, response); - this.emit(ChangeStream.RESPONSE); - callback(err, response); - }); - } -} diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 851f1d5f051..ceed2a53d06 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -264,7 +264,7 @@ export abstract class AbstractCursor< stream(options?: CursorStreamOptions): Readable & AsyncIterable { if (options?.transform) { const transform = options.transform; - const readable = makeCursorStream(this); + const readable = new ReadableCursorStream(this); return readable.pipe( new Transform({ @@ -282,7 +282,7 @@ export abstract class AbstractCursor< ); } - return makeCursorStream(this); + return new ReadableCursorStream(this); } hasNext(): Promise; @@ -714,7 +714,21 @@ function nextDocument(cursor: AbstractCursor): T | null { return null; } -function next(cursor: AbstractCursor, blocking: boolean, callback: Callback): void { +/** + * @param cursor - the cursor on which to call `next` + * @param blocking - a boolean indicating whether or not the cursor should `block` until data + * is available. Generally, this flag is set to `false` because if the getMore returns no documents, + * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and + * `tryNext`, for example) blocking is necessary because a getMore returning no documents does + * not indicate the end of the cursor. + * @param callback - callback to return the result to the caller + * @returns + */ +export function next( + cursor: AbstractCursor, + blocking: boolean, + callback: Callback +): void { const cursorId = cursor[kId]; if (cursor.closed) { return callback(undefined, null); @@ -844,50 +858,41 @@ export function assertUninitialized(cursor: AbstractCursor): void { } } -function makeCursorStream(cursor: AbstractCursor) { - const readable = new Readable({ - objectMode: true, - autoDestroy: false, - highWaterMark: 1 - }); - - let initialized = false; - let reading = false; - let needToClose = true; // NOTE: we must close the cursor if we never read from it, use `_construct` in future node versions +class ReadableCursorStream extends Readable { + private _cursor: AbstractCursor; + private _readInProgress = false; - readable._read = function () { - if (initialized === false) { - needToClose = false; - initialized = true; - } - - if (!reading) { - reading = true; - readNext(); - } - }; + constructor(cursor: AbstractCursor) { + super({ + objectMode: true, + autoDestroy: false, + highWaterMark: 1 + }); + this._cursor = cursor; + } - readable._destroy = function (error, cb) { - if (needToClose) { - cursor.close(err => process.nextTick(cb, err || error)); - } else { - cb(error); + // eslint-disable-next-line @typescript-eslint/no-unused-vars + override _read(size: number): void { + if (!this._readInProgress) { + this._readInProgress = true; + this._readNext(); } - }; + } - function readNext() { - needToClose = false; - next(cursor, true, (err, result) => { - needToClose = err ? !cursor.closed : result != null; + override _destroy(error: Error | null, callback: (error?: Error | null) => void): void { + this._cursor.close(err => process.nextTick(callback, err || error)); + } + private _readNext() { + next(this._cursor, true, (err, result) => { if (err) { // NOTE: This is questionable, but we have a test backing the behavior. It seems the // desired behavior is that a stream ends cleanly when a user explicitly closes // a client during iteration. Alternatively, we could do the "right" thing and // propagate the error message by removing this special case. if (err.message.match(/server is closed/)) { - cursor.close(); - return readable.push(null); + this._cursor.close(); + return this.push(null); } // NOTE: This is also perhaps questionable. The rationale here is that these errors tend @@ -896,25 +901,23 @@ function makeCursorStream(cursor: AbstractCursor) { // that changed to happen in cleanup legitimate errors would not destroy the // stream. There are change streams test specifically test these cases. if (err.message.match(/interrupted/)) { - return readable.push(null); + return this.push(null); } - return readable.destroy(err); + return this.destroy(err); } if (result == null) { - readable.push(null); - } else if (readable.destroyed) { - cursor.close(); + this.push(null); + } else if (this.destroyed) { + this._cursor.close(); } else { - if (readable.push(result)) { - return readNext(); + if (this.push(result)) { + return this._readNext(); } - reading = false; + this._readInProgress = false; } }); } - - return readable; } diff --git a/src/cursor/change_stream_cursor.ts b/src/cursor/change_stream_cursor.ts new file mode 100644 index 00000000000..b75c38f18d5 --- /dev/null +++ b/src/cursor/change_stream_cursor.ts @@ -0,0 +1,184 @@ +import type { Document, Long, Timestamp } from '../bson'; +import { + type ChangeStreamDocument, + type ChangeStreamEvents, + type OperationTime, + type ResumeToken, + ChangeStream +} from '../change_stream'; +import { INIT, RESPONSE } from '../constants'; +import type { MongoClient } from '../mongo_client'; +import type { TODO_NODE_3286 } from '../mongo_types'; +import { AggregateOperation } from '../operations/aggregate'; +import type { CollationOptions } from '../operations/command'; +import { type ExecutionResult, executeOperation } from '../operations/execute_operation'; +import type { ClientSession } from '../sessions'; +import { type Callback, type MongoDBNamespace, maxWireVersion } from '../utils'; +import { type AbstractCursorOptions, AbstractCursor } from './abstract_cursor'; + +/** @internal */ +export interface ChangeStreamCursorOptions extends AbstractCursorOptions { + startAtOperationTime?: OperationTime; + resumeAfter?: ResumeToken; + startAfter?: ResumeToken; + maxAwaitTimeMS?: number; + collation?: CollationOptions; + fullDocument?: string; +} + +/** @internal */ +export type ChangeStreamAggregateRawResult = { + $clusterTime: { clusterTime: Timestamp }; + cursor: { + postBatchResumeToken: ResumeToken; + ns: string; + id: number | Long; + } & ({ firstBatch: TChange[] } | { nextBatch: TChange[] }); + ok: 1; + operationTime: Timestamp; +}; + +/** @internal */ +export class ChangeStreamCursor< + TSchema extends Document = Document, + TChange extends Document = ChangeStreamDocument +> extends AbstractCursor { + _resumeToken: ResumeToken; + startAtOperationTime?: OperationTime; + hasReceived?: boolean; + resumeAfter: ResumeToken; + startAfter: ResumeToken; + options: ChangeStreamCursorOptions; + + postBatchResumeToken?: ResumeToken; + pipeline: Document[]; + + constructor( + client: MongoClient, + namespace: MongoDBNamespace, + pipeline: Document[] = [], + options: ChangeStreamCursorOptions = {} + ) { + super(client, namespace, options); + + this.pipeline = pipeline; + this.options = options; + this._resumeToken = null; + this.startAtOperationTime = options.startAtOperationTime; + + if (options.startAfter) { + this.resumeToken = options.startAfter; + } else if (options.resumeAfter) { + this.resumeToken = options.resumeAfter; + } + } + + set resumeToken(token: ResumeToken) { + this._resumeToken = token; + this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token); + } + + get resumeToken(): ResumeToken { + return this._resumeToken; + } + + get resumeOptions(): ChangeStreamCursorOptions { + const options: ChangeStreamCursorOptions = { + ...this.options + }; + + for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) { + delete options[key]; + } + + if (this.resumeToken != null) { + if (this.options.startAfter && !this.hasReceived) { + options.startAfter = this.resumeToken; + } else { + options.resumeAfter = this.resumeToken; + } + } else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) { + options.startAtOperationTime = this.startAtOperationTime; + } + + return options; + } + + cacheResumeToken(resumeToken: ResumeToken): void { + if (this.bufferedCount() === 0 && this.postBatchResumeToken) { + this.resumeToken = this.postBatchResumeToken; + } else { + this.resumeToken = resumeToken; + } + this.hasReceived = true; + } + + _processBatch(response: ChangeStreamAggregateRawResult): void { + const cursor = response.cursor; + if (cursor.postBatchResumeToken) { + this.postBatchResumeToken = response.cursor.postBatchResumeToken; + + const batch = + 'firstBatch' in response.cursor ? response.cursor.firstBatch : response.cursor.nextBatch; + if (batch.length === 0) { + this.resumeToken = cursor.postBatchResumeToken; + } + } + } + + clone(): AbstractCursor { + return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, { + ...this.cursorOptions + }); + } + + _initialize(session: ClientSession, callback: Callback): void { + const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, { + ...this.cursorOptions, + ...this.options, + session + }); + + executeOperation>( + session.client, + aggregateOperation, + (err, response) => { + if (err || response == null) { + return callback(err); + } + + const server = aggregateOperation.server; + if ( + this.startAtOperationTime == null && + this.resumeAfter == null && + this.startAfter == null && + maxWireVersion(server) >= 7 + ) { + this.startAtOperationTime = response.operationTime; + } + + this._processBatch(response); + + this.emit(INIT, response); + this.emit(RESPONSE); + + // TODO: NODE-2882 + callback(undefined, { server, session, response }); + } + ); + } + + override _getMore(batchSize: number, callback: Callback): void { + super._getMore(batchSize, (err, response) => { + if (err) { + return callback(err); + } + + this._processBatch(response as TODO_NODE_3286 as ChangeStreamAggregateRawResult); + + this.emit(ChangeStream.MORE, response); + this.emit(ChangeStream.RESPONSE); + callback(err, response); + }); + } +} diff --git a/src/index.ts b/src/index.ts index 410af2ee8d5..1e78dc2a676 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,6 +29,7 @@ export { ObjectId, Timestamp } from './bson'; +export { ChangeStreamCursor } from './cursor/change_stream_cursor'; /** * @public * @deprecated Please use `ObjectId` @@ -170,12 +171,9 @@ export type { OrderedBulkOperation } from './bulk/ordered'; export type { UnorderedBulkOperation } from './bulk/unordered'; export type { ChangeStream, - ChangeStreamAggregateRawResult, ChangeStreamCollModDocument, ChangeStreamCreateDocument, ChangeStreamCreateIndexDocument, - ChangeStreamCursor, - ChangeStreamCursorOptions, ChangeStreamDeleteDocument, ChangeStreamDocument, ChangeStreamDocumentCollectionUUID, @@ -259,6 +257,10 @@ export type { } from './cursor/abstract_cursor'; export type { InternalAbstractCursorOptions } from './cursor/abstract_cursor'; export type { AggregationCursorOptions } from './cursor/aggregation_cursor'; +export type { + ChangeStreamAggregateRawResult, + ChangeStreamCursorOptions +} from './cursor/change_stream_cursor'; export type { DbOptions, DbPrivate } from './db'; export type { AutoEncrypter, AutoEncryptionOptions, AutoEncryptionTlsOptions } from './deps'; export type { Encrypter, EncrypterOptions } from './encrypter'; diff --git a/test/unit/change_stream.test.ts b/test/unit/change_stream.test.ts index 8c0be4b2265..ad27ddc2f2d 100644 --- a/test/unit/change_stream.test.ts +++ b/test/unit/change_stream.test.ts @@ -2,7 +2,7 @@ import { Long, Timestamp } from 'bson'; import { expect } from 'chai'; import * as sinon from 'sinon'; -import { ChangeStreamCursor } from '../../src/change_stream'; +import { ChangeStreamCursor } from '../../src/cursor/change_stream_cursor'; import { MongoClient } from '../../src/mongo_client'; import { MongoDBNamespace } from '../../src/utils';