Skip to content
Merged
20 changes: 9 additions & 11 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import { Readable, Transform } from 'stream';
import type { ExecutionResult } from '../operations/execute_operation';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { ReadConcern, ReadConcernLike } from '../read_concern';
import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types';

Expand Down Expand Up @@ -610,16 +611,13 @@ export abstract class AbstractCursor<
return;
}

server.getMore(
cursorNs,
cursorId,
{
...this[kOptions],
session: this[kSession],
batchSize
},
callback
);
const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
Comment thread
dariakp marked this conversation as resolved.
...this[kOptions],
session: this[kSession],
batchSize
});

executeOperation(this.topology, getMoreOperation, callback);
}
}

Expand Down
17 changes: 13 additions & 4 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import type { Document } from '../bson';
import { supportsRetryableWrites } from '../utils';
import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection';
import {
sameServerSelector,
secondaryWritableServerSelector,
ServerSelector
} from '../sdam/server_selection';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand Down Expand Up @@ -153,9 +157,14 @@ function executeWithServerSelection(

let selector: ReadPreference | ServerSelector;

// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
if (operation.trySecondaryWrite) {
if (operation.hasAspect(Aspect.CURSOR_ITERATING)) {
Comment thread
dariakp marked this conversation as resolved.
// Get more operations must always select the same server, but run through
// server selection to potentially force monitor checks if the server is
// in an unknown state.
selector = sameServerSelector(operation.server?.description);
} else if (operation.trySecondaryWrite) {
// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
} else {
selector = readPreference;
Expand Down
49 changes: 49 additions & 0 deletions src/operations/get_more.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import type { Document, Long } from '../bson';
import { MongoRuntimeError } from '../error';
import type { Callback, MongoDBNamespace } from '../utils';
import type { Server } from '../sdam/server';
import { Aspect, AbstractOperation, OperationOptions, defineAspects } from './operation';
import type { ClientSession } from '../sessions';

/**
* @public
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface GetMoreOptions extends OperationOptions {
/** Set the batchSize for the getMoreCommand when iterating over the query results. */
batchSize?: number;
/** You can put a $comment field on a query to make looking in the profiler logs simpler. */
comment?: string | Document;
/** Number of milliseconds to wait before aborting the query. */
maxTimeMS?: number;
}

/** @internal */
export class GetMoreOperation extends AbstractOperation {
cursorId: Long;
options: GetMoreOptions;
server: Server;

constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
super(options);
this.options = options;
this.ns = ns;
this.cursorId = cursorId;
this.server = server;
}

/**
* Although there is a server already associated with the get more operation, the signature
* for execute passes a server so we will just use that one.
Comment thread
nbbeeken marked this conversation as resolved.
*/
execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
if (server !== this.server) {
return callback(
new MongoRuntimeError('Getmore must run on the same server operation began on')
);
}
server.getMore(this.ns, this.cursorId, this.options, callback);
}
}

defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]);
3 changes: 2 additions & 1 deletion src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export const Aspect = {
RETRYABLE: Symbol('RETRYABLE'),
EXPLAINABLE: Symbol('EXPLAINABLE'),
SKIP_COLLATION: Symbol('SKIP_COLLATION'),
CURSOR_CREATING: Symbol('CURSOR_CREATING')
CURSOR_CREATING: Symbol('CURSOR_CREATING'),
CURSOR_ITERATING: Symbol('CURSOR_ITERATING')
} as const;

/** @public */
Expand Down
18 changes: 18 additions & 0 deletions src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ export function writableServerSelector(): ServerSelector {
);
}

/**
* The purpose of this selector is to select the same server, only
* if it is in a state that it can have commands sent to it.
*/
export function sameServerSelector(description?: ServerDescription): ServerSelector {
Comment thread
dariakp marked this conversation as resolved.
return (
topologyDescription: TopologyDescription,
servers: ServerDescription[]
): ServerDescription[] => {
if (!description) return [];
// Filter the servers to match the provided description only if
// the type is not unknown.
return servers.filter(sd => {
return sd.address === description.address && sd.type !== ServerType.Unknown;
});
};
}

/**
* Returns a server selector that uses a read preference to select a
* server potentially for a write on a secondary.
Expand Down
118 changes: 118 additions & 0 deletions test/unit/operations/get_more.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
'use strict';

const sinon = require('sinon');
const { expect } = require('chai');
const { Long } = require('../../../src/bson');
const { GetMoreOperation } = require('../../../src/operations/get_more');
const { Server } = require('../../../src/sdam/server');
const { ClientSession } = require('../../../src/sessions');
const { ReadPreference } = require('../../../src/read_preference');
const { Aspect } = require('../../../src/operations/operation');
const { MongoRuntimeError } = require('../../../src/error');

describe('GetMoreOperation', function () {
Comment thread
dariakp marked this conversation as resolved.
const ns = 'db.coll';
const cursorId = Long.fromNumber(1);
const options = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we freeze these so that we can make sure the options aren't mutated? (otherwise that deep equal check below doesn't guarantee as much)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now all params are frozen.

batchSize: 100,
comment: 'test',
maxTimeMS: 500,
readPreference: ReadPreference.primary
};

describe('#constructor', function () {
const server = sinon.createStubInstance(Server, {});
const operation = new GetMoreOperation(ns, cursorId, server, options);

it('sets the namespace', function () {
expect(operation.ns).to.equal(ns);
});

it('sets the cursorId', function () {
expect(operation.cursorId).to.equal(cursorId);
});

it('sets the server', function () {
expect(operation.server).to.equal(server);
});

it('sets the options', function () {
expect(operation.options).to.deep.equal(options);
});
});

describe('#execute', function () {
context('when the server is the same as the instance', function () {
const getMoreStub = sinon.stub().yields(undefined);
const server = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const session = sinon.createStubInstance(ClientSession);
const opts = { ...options, session };
const operation = new GetMoreOperation(ns, cursorId, server, opts);

it('executes a getmore on the provided server', function (done) {
const callback = () => {
const call = getMoreStub.getCall(0);
expect(getMoreStub.calledOnce).to.be.true;
expect(call.args[0]).to.equal(ns);
expect(call.args[1]).to.equal(cursorId);
expect(call.args[2]).to.deep.equal(opts);
done();
};
operation.execute(server, session, callback);
});
});

context('when the server is not the same as the instance', function () {
const getMoreStub = sinon.stub().yields(undefined);
const server = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const newServer = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const session = sinon.createStubInstance(ClientSession);
const opts = { ...options, session };
const operation = new GetMoreOperation(ns, cursorId, server, opts);

it('errors in the callback', function (done) {
const callback = error => {
expect(error).to.be.instanceOf(MongoRuntimeError);
expect(error.message).to.equal('Getmore must run on the same server operation began on');
done();
};
operation.execute(newServer, session, callback);
});
});
});

describe('#hasAspect', function () {
const server = sinon.createStubInstance(Server, {});
const operation = new GetMoreOperation(ns, cursorId, server, options);

context('when the aspect is cursor iterating', function () {
it('returns true', function () {
expect(operation.hasAspect(Aspect.CURSOR_ITERATING)).to.be.true;
});
});

context('when the aspect is read', function () {
it('returns true', function () {
expect(operation.hasAspect(Aspect.READ_OPERATION)).to.be.true;
});
});

context('when the aspect is write', function () {
it('returns false', function () {
expect(operation.hasAspect(Aspect.WRITE_OPERATION)).to.be.false;
});
});

context('when the aspect is retryable', function () {
it('returns false', function () {
expect(operation.hasAspect(Aspect.RETRYABLE)).to.be.false;
});
});
});
});
Loading