Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export interface BulkResult {
* Keeps the state of a unordered batch so we can rewrite the results
* correctly after command execution
*
* @internal
* @public
*/
export class Batch {
originalZeroIndex: number;
Expand Down
24 changes: 18 additions & 6 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const CHANGE_DOMAIN_TYPES = {
const NO_RESUME_TOKEN_ERROR = new MongoError(
'A change stream document has been received that lacks a resume token (_id).'
);
const NO_CURSOR_ERROR = new MongoError('ChangeStream has no cursor');
const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed');

/** @public */
Expand Down Expand Up @@ -277,7 +278,7 @@ export class ChangeStream extends EventEmitter {
hasNext(callback?: Callback): Promise<void> | void {
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error
Comment thread
mbroadst marked this conversation as resolved.
Outdated
cursor.hasNext(cb);
});
});
Expand All @@ -287,8 +288,7 @@ export class ChangeStream extends EventEmitter {
next(callback?: Callback): Promise<void> | void {
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err) return cb(err); // failed to resume, raise an error
if (!cursor) return cb(new MongoError('Cursor is undefined'));
if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error
cursor.next((error, change) => {
if (error) {
this[kResumeQueue].push(() => this.next(cb));
Expand Down Expand Up @@ -330,11 +330,23 @@ export class ChangeStream extends EventEmitter {
*/
stream(options?: CursorStreamOptions): Readable {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
}
if (!this.cursor) throw NO_CURSOR_ERROR;
return this.cursor.stream(options);
}

/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}
}

/** @internal */
Expand Down
1 change: 0 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ export abstract class AbstractCursor extends EventEmitter {

/**
* Try to get the next available document from the cursor or `null` if an empty batch is returned
* @internal
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
Expand Down