Skip to content
Merged
14 changes: 14 additions & 0 deletions global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ declare global {
(title: string, metadata: MongoDBMetadataUI, fn: (this: Suite) => void): Mocha.Suite;
}

interface ExclusiveSuiteFunction {
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.Func>): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.AsyncFunc>): Mocha.Test;
}

interface ExclusiveTestFunction {
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.Func>): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.AsyncFunc>): Mocha.Test;
}

interface TestFunction {
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
Expand Down
227 changes: 23 additions & 204 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ import Denque = require('denque');
import type { Readable } from 'stream';
import { setTimeout } from 'timers';

import type { Binary, Document, Long, Timestamp } from './bson';
import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
import {
AbstractCursor,
AbstractCursorEvents,
AbstractCursorOptions,
CursorStreamOptions
} from './cursor/abstract_cursor';
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
import { ChangeStreamCursor, ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
import { Db } from './db';
import {
AnyError,
Expand All @@ -20,13 +16,12 @@ import {
MongoRuntimeError
} from './error';
import { MongoClient } from './mongo_client';
import { InferIdType, TODO_NODE_3286, TypedEventEmitter } from './mongo_types';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { InferIdType, TypedEventEmitter } from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import type { CollationOptions, OperationParent } from './operations/command';
import { executeOperation, ExecutionResult } from './operations/execute_operation';
import type { ReadPreference } from './read_preference';
import type { Topology } from './sdam/topology';
import type { ClientSession, ServerSessionId } from './sessions';
import type { ServerSessionId } from './sessions';
import {
calculateDurationInMs,
Callback,
Expand Down Expand Up @@ -111,18 +106,6 @@ export interface PipeOptions {
end?: boolean;
}

/** @internal */
export type ChangeStreamAggregateRawResult<TChange> = {
$clusterTime: { clusterTime: Timestamp };
cursor: {
postBatchResumeToken: ResumeToken;
ns: string;
id: number | Long;
} & ({ firstBatch: TChange[] } | { nextBatch: TChange[] });
ok: 1;
operationTime: Timestamp;
};

/**
* Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
* @public
Expand Down Expand Up @@ -700,6 +683,21 @@ export class ChangeStream<
});
}

/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
this._getCursor((err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}

/** Is the cursor closed */
get closed(): boolean {
return this[kClosed] || (this.cursor?.closed ?? false);
Expand Down Expand Up @@ -733,21 +731,6 @@ export class ChangeStream<
return this.cursor.stream(options);
}

/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
this._getCursor((err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}

/** @internal */
private _setIsEmitter(): void {
if (this[kMode] === 'iterator') {
Expand Down Expand Up @@ -923,15 +906,6 @@ export class ChangeStream<
this._processResumeQueue();
};

// otherwise, raise an error and close the change stream
const unresumableError = (err: AnyError) => {
if (!callback) {
this.emit(ChangeStream.ERROR, err);
}

this.close(() => this._processResumeQueue(err));
};

if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
this.cursor = undefined;

Expand All @@ -944,7 +918,7 @@ export class ChangeStream<
const topology = getTopology(this.parent);
this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
// if the topology can't reconnect, close the stream
if (err) return unresumableError(err);
if (err) return this._closeWithError(err, callback);

// create a new cursor, preserving the old cursor's options
const newCursor = this._createChangeStreamCursor(cursor.resumeOptions);
Expand All @@ -955,7 +929,7 @@ export class ChangeStream<
// attempt to continue in iterator mode
newCursor.hasNext(err => {
// if there's an error immediately after resuming, close the stream
if (err) return unresumableError(err);
if (err) return this._closeWithError(err);
resumeWithCursor(newCursor);
});
});
Expand Down Expand Up @@ -1010,158 +984,3 @@ export class ChangeStream<
}
}
}

/** @internal */
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
maxAwaitTimeMS?: number;
collation?: CollationOptions;
fullDocument?: string;
}

/** @internal */
export class ChangeStreamCursor<
TSchema extends Document = Document,
TChange extends Document = ChangeStreamDocument<TSchema>
> extends AbstractCursor<TChange, ChangeStreamEvents> {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
resumeAfter: ResumeToken;
startAfter: ResumeToken;
options: ChangeStreamCursorOptions;

postBatchResumeToken?: ResumeToken;
pipeline: Document[];

constructor(
client: MongoClient,
namespace: MongoDBNamespace,
pipeline: Document[] = [],
options: ChangeStreamCursorOptions = {}
) {
super(client, namespace, options);

this.pipeline = pipeline;
this.options = options;
this._resumeToken = null;
this.startAtOperationTime = options.startAtOperationTime;

if (options.startAfter) {
this.resumeToken = options.startAfter;
} else if (options.resumeAfter) {
this.resumeToken = options.resumeAfter;
}
}

set resumeToken(token: ResumeToken) {
this._resumeToken = token;
this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token);
}

get resumeToken(): ResumeToken {
return this._resumeToken;
}

get resumeOptions(): ChangeStreamCursorOptions {
const options: ChangeStreamCursorOptions = {
...this.options
};

for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) {
delete options[key];
}

if (this.resumeToken != null) {
if (this.options.startAfter && !this.hasReceived) {
options.startAfter = this.resumeToken;
} else {
options.resumeAfter = this.resumeToken;
}
} else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) {
options.startAtOperationTime = this.startAtOperationTime;
}

return options;
}

cacheResumeToken(resumeToken: ResumeToken): void {
if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
this.resumeToken = this.postBatchResumeToken;
} else {
this.resumeToken = resumeToken;
}
this.hasReceived = true;
}

_processBatch(response: ChangeStreamAggregateRawResult<TChange>): void {
const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.postBatchResumeToken = response.cursor.postBatchResumeToken;

const batch =
'firstBatch' in response.cursor ? response.cursor.firstBatch : response.cursor.nextBatch;
if (batch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
}

clone(): AbstractCursor<TChange> {
return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, {
...this.cursorOptions
});
}

_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
...this.cursorOptions,
...this.options,
session
});

executeOperation<TODO_NODE_3286, ChangeStreamAggregateRawResult<TChange>>(
session.client,
aggregateOperation,
(err, response) => {
if (err || response == null) {
return callback(err);
}

const server = aggregateOperation.server;
if (
this.startAtOperationTime == null &&
this.resumeAfter == null &&
this.startAfter == null &&
maxWireVersion(server) >= 7
) {
this.startAtOperationTime = response.operationTime;
}

this._processBatch(response);

this.emit(ChangeStream.INIT, response);
this.emit(ChangeStream.RESPONSE);

// TODO: NODE-2882
callback(undefined, { server, session, response });
}
);
}

override _getMore(batchSize: number, callback: Callback): void {
super._getMore(batchSize, (err, response) => {
if (err) {
return callback(err);
}

this._processBatch(response as TODO_NODE_3286 as ChangeStreamAggregateRawResult<TChange>);

this.emit(ChangeStream.MORE, response);
this.emit(ChangeStream.RESPONSE);
callback(err, response);
});
}
}
Loading