Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { MongoClient } from '../mongo_client';
import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { KillCursorsOperation } from '../operations/kill_cursors';
import { ReadConcern, ReadConcernLike } from '../read_concern';
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
Expand Down Expand Up @@ -118,7 +119,7 @@ export abstract class AbstractCursor<
/** @internal */
[kId]?: Long;
/** @internal */
[kSession]?: ClientSession;
[kSession]: ClientSession;
/** @internal */
[kServer]?: Server;
/** @internal */
Expand Down Expand Up @@ -187,6 +188,8 @@ export abstract class AbstractCursor<

if (options.session instanceof ClientSession) {
this[kSession] = options.session;
} else {
this[kSession] = this[kClient].startSession({ owner: this, explicit: false });
}
}

Expand Down Expand Up @@ -217,11 +220,11 @@ export abstract class AbstractCursor<
}

/** @internal */
get session(): ClientSession | undefined {
get session(): ClientSession {
return this[kSession];
}

set session(clientSession: ClientSession | undefined) {
set session(clientSession: ClientSession) {
this[kSession] = clientSession;
}

Expand Down Expand Up @@ -592,11 +595,12 @@ export abstract class AbstractCursor<
const session = this[kSession];
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
if (session.explicit === false && !session.hasEnded) {
session.endSession();
if (session.explicit === false) {
if (!session.hasEnded) {
session.endSession(() => null);
}
this[kSession] = this.client.startSession({ owner: this, explicit: false });
}

this[kSession] = undefined;
}
}

Expand Down Expand Up @@ -644,22 +648,10 @@ export abstract class AbstractCursor<
* a significant refactor.
*/
[kInit](callback: Callback<TSchema | null>): void {
if (this[kSession] == null) {
if (this[kClient].topology?.shouldCheckForSessionSupport()) {
return this[kClient].topology?.selectServer(ReadPreference.primaryPreferred, {}, err => {
if (err) return callback(err);
return this[kInit](callback);
});
} else if (this[kClient].topology?.hasSessionSupport()) {
this[kSession] = this[kClient].topology?.startSession({ owner: this, explicit: false });
}
}

this._initialize(this[kSession], (err, state) => {
if (state) {
const response = state.response;
this[kServer] = state.server;
this[kSession] = state.session;

if (response.cursor) {
this[kId] =
Expand Down Expand Up @@ -843,11 +835,11 @@ function cleanupCursor(
}

cursor[kKilled] = true;
server.killCursors(
cursorNs,
[cursorId],
{ ...pluckBSONSerializeOptions(cursor[kOptions]), session },
() => completeCleanup()

return executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session }),
completeCleanup
);
}

Expand Down
88 changes: 64 additions & 24 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import type { MONGO_CLIENT_EVENTS } from './constants';
import { Db, DbOptions } from './db';
import type { AutoEncrypter, AutoEncryptionOptions } from './deps';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError, MongoNotConnectedError } from './error';
import { MongoInvalidArgumentError } from './error';
import type { Logger, LoggerLevel } from './logger';
import { TypedEventEmitter } from './mongo_types';
import { connect } from './operations/connect';
import { PromiseProvider } from './promise_provider';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
import type { ReadPreference, ReadPreferenceMode } from './read_preference';
import { ReadPreference, ReadPreferenceMode } from './read_preference';
import type { TagSet } from './sdam/server_description';
import type { SrvPoller } from './sdam/srv_polling';
import type { Topology, TopologyEvents } from './sdam/topology';
import type { ClientSession, ClientSessionOptions } from './sessions';
import { ClientSession, ClientSessionOptions, ServerSessionPool } from './sessions';
import {
Callback,
ClientMetadata,
Expand Down Expand Up @@ -267,10 +267,16 @@ export type WithSessionCallback = (session: ClientSession) => Promise<any>;
/** @internal */
export interface MongoClientPrivate {
url: string;
sessions: Set<ClientSession>;
bsonOptions: BSONSerializeOptions;
namespace: MongoDBNamespace;
hasBeenClosed: boolean;
/**
* We keep a reference to the sessions that are acquired from the pool.
* - used to track and close all sessions in client.close() (which is non-standard behavior)
* - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions
*/
readonly activeSessions: Set<ClientSession>;
readonly sessionPool: ServerSessionPool;
readonly options: MongoOptions;
readonly readConcern?: ReadConcern;
readonly writeConcern?: WriteConcern;
Expand Down Expand Up @@ -352,10 +358,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
// The internal state
this.s = {
url,
sessions: new Set(),
bsonOptions: resolveBSONOptions(this[kOptions]),
namespace: ns('admin'),
hasBeenClosed: false,
sessionPool: new ServerSessionPool(this),
activeSessions: new Set(),

get options() {
return client[kOptions];
Expand Down Expand Up @@ -470,23 +477,51 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {

return maybePromise(callback, callback => {
if (this.topology == null) {
// Do not connect just to end sessions
return callback();
}

// clear out references to old topology
const topology = this.topology;
this.topology = undefined;

topology.close({ force }, error => {
if (error) return callback(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.close(this, force, error => {
callback(error);
const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
this.s.activeSessions.clear();

Promise.all(activeSessionEnds)
.then(() => {
const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id);
if (endSessions.length === 0) return;
return this.db('admin')
.command(
{ endSessions },
{ readPreference: ReadPreference.primaryPreferred, noResponse: true }
)
.then(() => null) // outcome does not matter
.catch(() => null); // outcome does not matter
})
.then(() => {
if (this.topology == null) {
return callback();
}
// clear out references to old topology
const topology = this.topology;
this.topology = undefined;

return new Promise<void>((resolve, reject) => {
topology.close({ force }, error => {
if (error) return reject(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.close(this, force, error => {
if (error) return reject(error);
resolve();
});
}
resolve();
});
});
}
callback();
});
})
.then(
() => callback(),
error => callback(error)
);
});
}

Expand Down Expand Up @@ -553,12 +588,17 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
startSession(): ClientSession;
startSession(options: ClientSessionOptions): ClientSession;
startSession(options?: ClientSessionOptions): ClientSession {
options = Object.assign({ explicit: true }, options);
if (!this.topology) {
throw new MongoNotConnectedError('MongoClient must be connected to start a session');
}

return this.topology.startSession(options, this.s.options);
const session = new ClientSession(
this,
this.s.sessionPool,
{ explicit: true, ...options },
this[kOptions]
);
this.s.activeSessions.add(session);
session.once('ended', () => {
this.s.activeSessions.delete(session);
});
return session;
}

/**
Expand Down
22 changes: 15 additions & 7 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,24 @@ export function executeOperation<
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
session = client.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return callback(new MongoExpiredSessionError('Use of expired sessions is not permitted'));
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
return callback(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later'));
}
} else if (session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoCompatibilityError('Current topology does not support sessions'));
} else {
// no session support
if (session && session.explicit) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoCompatibilityError('Current topology does not support sessions'));
} else if (session && !session.explicit) {
// We do not have to worry about ending the session because the server session has not been acquired yet
delete operation.options.session;
operation.clearSession();
session = undefined;
}
}

try {
Expand Down Expand Up @@ -166,8 +174,8 @@ function executeWithServerSelection<TResult>(

let selector: ReadPreference | ServerSelector;

if (operation.hasAspect(Aspect.CURSOR_ITERATING)) {
// Get more operations must always select the same server, but run through
if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
// GetMore and KillCursor operations must always select the same server, but run through
// server selection to potentially force monitor checks if the server is
// in an unknown state.
selector = sameServerSelector(operation.server?.description);
Expand Down
2 changes: 1 addition & 1 deletion src/operations/get_more.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ export class GetMoreOperation extends AbstractOperation {
}
}

defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]);
defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.MUST_SELECT_SAME_SERVER]);
27 changes: 27 additions & 0 deletions src/operations/kill_cursors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { Long } from '../bson';
import { MongoRuntimeError } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback, MongoDBNamespace } from '../utils';
import { AbstractOperation, Aspect, defineAspects, OperationOptions } from './operation';

export class KillCursorsOperation extends AbstractOperation {
cursorId: Long;
constructor(cursorId: Long, ns: MongoDBNamespace, server: Server, options: OperationOptions) {
super(options);
this.ns = ns;
this.cursorId = cursorId;
this.server = server;
}

execute(server: Server, session: ClientSession | undefined, callback: Callback<void>): void {
if (server !== this.server) {
return callback(
new MongoRuntimeError('Killcursor must run on the same server operation began on')
);
}
server.killCursors(this.ns, [this.cursorId], { session }, () => callback());
}
}

defineAspects(KillCursorsOperation, [Aspect.MUST_SELECT_SAME_SERVER]);
6 changes: 5 additions & 1 deletion src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const Aspect = {
EXPLAINABLE: Symbol('EXPLAINABLE'),
SKIP_COLLATION: Symbol('SKIP_COLLATION'),
CURSOR_CREATING: Symbol('CURSOR_CREATING'),
CURSOR_ITERATING: Symbol('CURSOR_ITERATING')
MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER')
} as const;

/** @public */
Expand Down Expand Up @@ -94,6 +94,10 @@ export abstract class AbstractOperation<TResult = any> {
return this[kSession];
}

clearSession() {
this[kSession] = undefined;
}

get canRetryRead(): boolean {
return true;
}
Expand Down
Loading