Skip to content

Commit

Permalink
feat(NODE-6231): Add CSOT behaviour for retryable reads and writes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James authored and nbbeeken committed Sep 27, 2024
1 parent 4999789 commit 3d68e8c
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 21 deletions.
9 changes: 5 additions & 4 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,10 @@ async function tryOperation<
session.incrementTransactionNumber();
}

// TODO(NODE-6231): implement infinite retry within CSOT timeout here
const maxTries = willRetry ? 2 : 1;
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;

// TODO(NODE-6231): implement infinite retry within CSOT timeout here
for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
Expand Down Expand Up @@ -276,7 +274,6 @@ async function tryOperation<
return await operation.execute(server, session, timeoutContext);
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;

if (
previousOperationError != null &&
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
Expand All @@ -285,6 +282,10 @@ async function tryOperation<
}
previousServer = server.description;
previousOperationError = operationError;

// Reset timeouts
timeoutContext.serverSelectionTimeout?.clear();
timeoutContext.connectionCheckoutTimeout?.clear();
}
}

Expand Down
26 changes: 17 additions & 9 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export class Timeout extends Promise<never> {
public ended: number | null = null;
public duration: number;
public timedOut = false;
public cleared = false;

get remainingTime(): number {
if (this.timedOut) return 0;
Expand All @@ -53,7 +54,6 @@ export class Timeout extends Promise<never> {
/** Create a new timeout that expires in `duration` ms */
private constructor(executor: Executor = () => null, duration: number, unref = true) {
let reject!: Reject;

if (duration < 0) {
throw new MongoInvalidArgumentError('Cannot create a Timeout with a negative duration');
}
Expand Down Expand Up @@ -86,6 +86,7 @@ export class Timeout extends Promise<never> {
clear(): void {
clearTimeout(this.id);
this.id = undefined;
this.cleared = true;
}

throwIfExpired(): void {
Expand Down Expand Up @@ -213,16 +214,20 @@ export class CSOTTimeoutContext extends TimeoutContext {

get serverSelectionTimeout(): Timeout | null {
// check for undefined
if (typeof this._serverSelectionTimeout !== 'object') {
if (typeof this._serverSelectionTimeout !== 'object' || this._serverSelectionTimeout?.cleared) {
const { remainingTimeMS, serverSelectionTimeoutMS } = this;
if (remainingTimeMS <= 0)
throw new MongoOperationTimeoutError(
`Timed out in server selection after ${this.timeoutMS}ms`
);
const usingServerSelectionTimeoutMS =
this.serverSelectionTimeoutMS !== 0 &&
csotMin(this.timeoutMS, this.serverSelectionTimeoutMS) === this.serverSelectionTimeoutMS;

serverSelectionTimeoutMS !== 0 &&
csotMin(remainingTimeMS, serverSelectionTimeoutMS) === serverSelectionTimeoutMS;
if (usingServerSelectionTimeoutMS) {
this._serverSelectionTimeout = Timeout.expires(this.serverSelectionTimeoutMS);
this._serverSelectionTimeout = Timeout.expires(serverSelectionTimeoutMS);
} else {
if (this.timeoutMS > 0) {
this._serverSelectionTimeout = Timeout.expires(this.timeoutMS);
if (remainingTimeMS > 0 && Number.isFinite(remainingTimeMS)) {
this._serverSelectionTimeout = Timeout.expires(remainingTimeMS);
} else {
this._serverSelectionTimeout = null;
}
Expand All @@ -233,7 +238,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
}

get connectionCheckoutTimeout(): Timeout | null {
if (typeof this._connectionCheckoutTimeout !== 'object') {
if (
typeof this._connectionCheckoutTimeout !== 'object' ||
this._connectionCheckoutTimeout?.cleared
) {
if (typeof this._serverSelectionTimeout === 'object') {
// null or Timeout
this._connectionCheckoutTimeout = this._serverSelectionTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
const enabled = [
'override-collection-timeoutMS',
'override-database-timeoutMS',
'override-operation-timeoutMS'
'override-operation-timeoutMS',
'retryability-legacy-timeouts',
'retryability-timeoutMS'
];

const cursorOperations = [
Expand All @@ -18,6 +20,11 @@ const cursorOperations = [
'listCollectionNames'
];

const bulkWriteOperations = [
'timeoutMS applies to whole operation, not individual attempts - bulkWrite on collection',
'timeoutMS applies to whole operation, not individual attempts - insertMany on collection'
];

describe('CSOT spec tests', function () {
const specs = loadSpecTests(join('client-side-operations-timeout'));
for (const spec of specs) {
Expand All @@ -30,6 +37,10 @@ describe('CSOT spec tests', function () {
// Cursor operation
if (test.operations.find(operation => cursorOperations.includes(operation.name)))
test.skipReason = 'TODO(NODE-5684): Not working yet';

if (bulkWriteOperations.includes(test.description))
test.skipReason =
'TODO(NODE-6274): update test runner to check errorResponse field of MongoBulkWriteError in isTimeoutError assertion';
}
}
runUnifiedSuite(specs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';

import { ConnectionPool, type MongoClient, Timeout, Topology } from '../../mongodb';
import { ConnectionPool, type MongoClient, Timeout, TimeoutContext, Topology } from '../../mongodb';

// TODO(NODE-5824): Implement CSOT prose tests
describe('CSOT spec unit tests', function () {
Expand All @@ -22,10 +22,16 @@ describe('CSOT spec unit tests', function () {
it('Operations should ignore waitQueueTimeoutMS if timeoutMS is also set.', async function () {
client = this.configuration.newClient({ waitQueueTimeoutMS: 999999, timeoutMS: 10000 });
sinon.spy(Timeout, 'expires');
const timeoutContextSpy = sinon.spy(TimeoutContext, 'create');

await client.db('db').collection('collection').insertOne({ x: 1 });

expect(Timeout.expires).to.have.been.calledWith(10000);
const createCalls = timeoutContextSpy.getCalls().filter(
// @ts-expect-error accessing concrete field
call => call.args[0].timeoutMS === 10000
);

expect(createCalls).to.have.length.greaterThanOrEqual(1);
expect(Timeout.expires).to.not.have.been.calledWith(999999);
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* Anything javascript specific relating to timeouts */
import { expect } from 'chai';
import * as sinon from 'sinon';

import {
type ClientSession,
Expand All @@ -13,10 +12,6 @@ import {
} from '../../mongodb';

describe('CSOT driver tests', () => {
afterEach(() => {
sinon.restore();
});

describe('timeoutMS inheritance', () => {
let client: MongoClient;
let db: Db;
Expand Down
2 changes: 2 additions & 0 deletions test/tools/unified-spec-runner/match.ts
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,8 @@ export function expectErrorCheck(
expect(error).to.be.instanceof(MongoOperationTimeoutError);
}

// TODO(NODE-6274): Check for MongoBulkWriteErrors that have a MongoOperationTimeoutError in their
// errorResponse field
if (expected.isTimeoutError === false) {
expect(error).to.not.be.instanceof(MongoOperationTimeoutError);
} else if (expected.isTimeoutError === true) {
Expand Down

0 comments on commit 3d68e8c

Please sign in to comment.