Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export interface BulkResult {
* Keeps the state of a unordered batch so we can rewrite the results
* correctly after command execution
*
* @internal
* @public
*/
export class Batch {
originalZeroIndex: number;
Expand Down
35 changes: 26 additions & 9 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const CHANGE_DOMAIN_TYPES = {
const NO_RESUME_TOKEN_ERROR = new MongoError(
'A change stream document has been received that lacks a resume token (_id).'
);
const NO_CURSOR_ERROR = new MongoError('ChangeStream has no cursor');
const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed');

/** @public */
Expand Down Expand Up @@ -287,8 +288,7 @@ export class ChangeStream extends EventEmitter {
next(callback?: Callback): Promise<void> | void {
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err) return cb(err); // failed to resume, raise an error
if (!cursor) return cb(new MongoError('Cursor is undefined'));
if (err || !cursor) return cb(err); // failed to resume, raise an error
cursor.next((error, change) => {
if (error) {
this[kResumeQueue].push(() => this.next(cb));
Expand Down Expand Up @@ -330,11 +330,23 @@ export class ChangeStream extends EventEmitter {
*/
stream(options?: CursorStreamOptions): Readable {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
}
if (!this.cursor) throw NO_CURSOR_ERROR;
return this.cursor.stream(options);
}

/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}
}

/** @internal */
Expand Down Expand Up @@ -707,11 +719,16 @@ function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCu
function processResumeQueue(changeStream: ChangeStream, err?: Error) {
while (changeStream[kResumeQueue].length) {
const request = changeStream[kResumeQueue].pop();
if (changeStream[kClosed] && !err) {
request(CHANGESTREAM_CLOSED_ERROR);
return;
if (!err) {
if (changeStream[kClosed]) {
request(CHANGESTREAM_CLOSED_ERROR);
return;
}
if (!changeStream.cursor) {
request(NO_CURSOR_ERROR);
return;
}
}

request(err, changeStream.cursor);
}
}
1 change: 0 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ export abstract class AbstractCursor extends EventEmitter {

/**
* Try to get the next available document from the cursor or `null` if an empty batch is returned
* @internal
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
Expand Down
5 changes: 4 additions & 1 deletion test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,10 @@ describe('Change Streams', function () {
});

it('when invoked using eventEmitter API', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
metadata: {
requires: { topology: 'replicaset', mongodb: '>=3.6' },
sessions: { skipLeakTests: true }
Comment thread
mbroadst marked this conversation as resolved.
Outdated
},
test: function (done) {
let closed = false;
const close = _err => {
Expand Down