Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6389): add support for timeoutMS in StateMachine.execute() #4243

Merged
merged 44 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3c2ec0a
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
W-A-James Apr 11, 2024
909578f
test(NODE-6120): Implement Unified test runner changes for CSOT (#4121)
W-A-James Jun 10, 2024
e101750
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
W-A-James Jun 21, 2024
e4efd3f
refactor(NODE-6230): executeOperation to use iterative retry mechanis…
nbbeeken Jul 22, 2024
22082c9
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
nbbeeken Jul 26, 2024
bf95fa4
feat(NODE-6231): Add CSOT behaviour for retryable reads and writes (#…
W-A-James Aug 1, 2024
c63d102
feat(NODE-6312): add error transformation for server timeouts (#4192)
nbbeeken Aug 12, 2024
1eab23d
feat(NODE-6313): add CSOT support to sessions and transactions (#4199)
nbbeeken Sep 9, 2024
4c4b0a9
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
W-A-James Sep 12, 2024
558d416
fix(NODE-6374): MongoOperationTimeoutError inherits MongoRuntimeError…
nbbeeken Sep 12, 2024
3ed4a14
test: remove empty skipped context blocks (#4238)
W-A-James Sep 12, 2024
d3438ea
feat(NODE-5844): add iscryptd to ServerDescription (#4239)
nbbeeken Sep 17, 2024
ff561e3
temp
aditi-khare-mongoDB Sep 19, 2024
164780c
temp
aditi-khare-mongoDB Sep 20, 2024
12a7e2e
chore: plumb timeoutMS around more
nbbeeken Sep 24, 2024
999f23d
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
W-A-James Apr 11, 2024
0355404
test(NODE-6120): Implement Unified test runner changes for CSOT (#4121)
W-A-James Jun 10, 2024
5ef3d69
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
W-A-James Jun 21, 2024
7139b8f
refactor(NODE-6230): executeOperation to use iterative retry mechanis…
nbbeeken Jul 22, 2024
acfb4fc
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
nbbeeken Jul 26, 2024
4efff95
feat(NODE-6231): Add CSOT behaviour for retryable reads and writes (#…
W-A-James Aug 1, 2024
1997f81
feat(NODE-6312): add error transformation for server timeouts (#4192)
nbbeeken Aug 12, 2024
cc3ef8f
feat(NODE-6313): add CSOT support to sessions and transactions (#4199)
nbbeeken Sep 9, 2024
38affae
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
W-A-James Sep 12, 2024
738188b
fix(NODE-6374): MongoOperationTimeoutError inherits MongoRuntimeError…
nbbeeken Sep 12, 2024
c4a7c2c
test: remove empty skipped context blocks (#4238)
W-A-James Sep 12, 2024
5aa6d4c
feat(NODE-5844): add iscryptd to ServerDescription (#4239)
nbbeeken Sep 17, 2024
17a2fde
chore: allow clientBulkWrite to use TimeoutContext (#4251)
W-A-James Sep 25, 2024
aead2f1
Merge branch 'NODE-6090' into NODE-6389
aditi-khare-mongoDB Sep 25, 2024
88ca990
half testing
aditi-khare-mongoDB Sep 26, 2024
2e3a84c
revert state machine test changes
aditi-khare-mongoDB Sep 30, 2024
e6e9fb4
requested changes
aditi-khare-mongoDB Oct 1, 2024
3dc383b
Merge branch 'NODE-6090' into NODE-6389
aditi-khare-mongoDB Oct 1, 2024
702a03e
lint fix
aditi-khare-mongoDB Oct 1, 2024
3b6a23b
test fix
aditi-khare-mongoDB Oct 1, 2024
5560a1b
no negative timeouts
aditi-khare-mongoDB Oct 2, 2024
601c159
requested changes
aditi-khare-mongoDB Oct 2, 2024
096f154
fix failing tests
aditi-khare-mongoDB Oct 3, 2024
5aba790
requested changes 3
aditi-khare-mongoDB Oct 3, 2024
903e0d0
limit flaky tests
aditi-khare-mongoDB Oct 3, 2024
709f725
Merge branch 'NODE-6090' into NODE-6389
aditi-khare-mongoDB Oct 4, 2024
cb12f64
lint fix
aditi-khare-mongoDB Oct 7, 2024
01aca89
Merge branch 'NODE-6090' into NODE-6389
baileympearson Oct 7, 2024
6ea56d3
Merge branch 'NODE-6090' into NODE-6389
baileympearson Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion etc/notes/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Children of `MongoError` include:
### `MongoDriverError`

This class represents errors which originate in the driver itself or when the user incorrectly uses the driver. This class should **never** be directly instantiated.
Its children are the main classes of errors that most users will interact with: [**`MongoAPIError`**](#MongoAPIError) and [**`MongoRuntimeError`**](#MongoRuntimeError).
Its children are the main classes of errors that most users will interact with: [**`MongoAPIError`**](#MongoAPIError), [**`MongoRuntimeError`**](#MongoRuntimeError) and [**`MongoOperationTimeoutError`**](#MongoOperationTimeoutError).

### `MongoAPIError`

Expand Down Expand Up @@ -109,6 +109,10 @@ This class should **never** be directly instantiated.
| **MongoGridFSChunkError** | Thrown when a malformed or invalid chunk is encountered when reading from a GridFS Stream. |
| **MongoUnexpectedServerResponseError** | Thrown when the driver receives a **parsable** response it did not expect from the server. |

### `MongoOperationTimeoutError`

- TODO(NODE-5688): Add MongoOperationTimeoutError documentation

### MongoUnexpectedServerResponseError

Intended for the scenario where the MongoDB returns an unexpected response in relation to some state the driver is in.
Expand Down
82 changes: 41 additions & 41 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
"mocha": "^10.4.0",
"mocha-sinon": "^2.1.2",
"mongodb-client-encryption": "^6.1.0",
"mongodb-legacy": "^6.1.0",
"mongodb-legacy": "^6.1.1",
"nyc": "^15.1.0",
"prettier": "^3.3.3",
"semver": "^7.6.3",
Expand Down
8 changes: 6 additions & 2 deletions src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ export class Admin {
new RunAdminCommandOperation(command, {
...resolveBSONOptions(options),
session: options?.session,
readPreference: options?.readPreference
readPreference: options?.readPreference,
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
})
);
}
Expand Down Expand Up @@ -154,7 +155,10 @@ export class Admin {
* @param options - Optional settings for the command
*/
async listDatabases(options?: ListDatabasesOptions): Promise<ListDatabasesResult> {
return await executeOperation(this.s.db.client, new ListDatabasesOperation(this.s.db, options));
return await executeOperation(
this.s.db.client,
new ListDatabasesOperation(this.s.db, { timeoutMS: this.s.db.timeoutMS, ...options })
);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import {
applyRetryableWrites,
type Callback,
Expand Down Expand Up @@ -873,6 +874,9 @@ export interface BulkWriteOptions extends CommandOperationOptions {
forceServerObjectId?: boolean;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;

/** @internal */
timeoutContext?: TimeoutContext;
}

const executeCommandsAsync = promisify(executeCommands);
Expand Down
97 changes: 71 additions & 26 deletions src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import {
} from '../bson';
import { type ProxyOptions } from '../cmap/connection';
import { getSocks, type SocksLib } from '../deps';
import { MongoOperationTimeoutError } from '../error';
import { type MongoClient, type MongoClientOptions } from '../mongo_client';
import { type CSOTTimeoutContext, Timeout, TimeoutError } from '../timeout';
import { BufferPool, MongoDBCollectionNamespace, promiseWithResolvers } from '../utils';
import { autoSelectSocketOptions, type DataKey } from './client_encryption';
import { MongoCryptError } from './errors';
Expand Down Expand Up @@ -182,7 +184,11 @@ export class StateMachine {
/**
* Executes the state machine according to the specification
*/
async execute(executor: StateMachineExecutable, context: MongoCryptContext): Promise<Uint8Array> {
async execute(
executor: StateMachineExecutable,
context: MongoCryptContext,
timeoutContext?: CSOTTimeoutContext
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
): Promise<Uint8Array> {
const keyVaultNamespace = executor._keyVaultNamespace;
const keyVaultClient = executor._keyVaultClient;
const metaDataClient = executor._metaDataClient;
Expand All @@ -201,8 +207,13 @@ export class StateMachine {
'unreachable state machine state: entered MONGOCRYPT_CTX_NEED_MONGO_COLLINFO but metadata client is undefined'
);
}
const collInfo = await this.fetchCollectionInfo(metaDataClient, context.ns, filter);

const collInfo = await this.fetchCollectionInfo(
metaDataClient,
context.ns,
filter,
timeoutContext?.csotEnabled() ? timeoutContext.remainingTimeMS : null
);
if (collInfo) {
context.addMongoOperationResponse(collInfo);
}
Expand All @@ -222,9 +233,20 @@ export class StateMachine {
// When we are using the shared library, we don't have a mongocryptd manager.
const markedCommand: Uint8Array = mongocryptdManager
? await mongocryptdManager.withRespawn(
this.markCommand.bind(this, mongocryptdClient, context.ns, command)
this.markCommand.bind(
this,
mongocryptdClient,
context.ns,
command,
timeoutContext?.csotEnabled() ? timeoutContext.remainingTimeMS : null
)
)
: await this.markCommand(mongocryptdClient, context.ns, command);
: await this.markCommand(
mongocryptdClient,
context.ns,
command,
timeoutContext?.csotEnabled() ? timeoutContext.remainingTimeMS : null
);

context.addMongoOperationResponse(markedCommand);
context.finishMongoOperation();
Expand All @@ -233,7 +255,12 @@ export class StateMachine {

case MONGOCRYPT_CTX_NEED_MONGO_KEYS: {
const filter = context.nextMongoOperation();
const keys = await this.fetchKeys(keyVaultClient, keyVaultNamespace, filter);
const keys = await this.fetchKeys(
keyVaultClient,
keyVaultNamespace,
filter,
timeoutContext?.csotEnabled() ? timeoutContext.remainingTimeMS : null
);

if (keys.length === 0) {
// See docs on EMPTY_V
Expand All @@ -255,9 +282,12 @@ export class StateMachine {
}
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

case MONGOCRYPT_CTX_NEED_KMS: {
const requests = Array.from(this.requests(context));
await Promise.all(requests);

await Promise.all(
this.requests(
context,
timeoutContext?.csotEnabled() ? timeoutContext.remainingTimeMS : null
)
);
context.finishKMSRequests();
break;
}
Expand Down Expand Up @@ -299,7 +329,7 @@ export class StateMachine {
* @param kmsContext - A C++ KMS context returned from the bindings
* @returns A promise that resolves when the KMS reply has be fully parsed
*/
async kmsRequest(request: MongoCryptKMSRequest): Promise<void> {
async kmsRequest(request: MongoCryptKMSRequest, timeoutMS?: number | null): Promise<void> {
const parsedUrl = request.endpoint.split(':');
const port = parsedUrl[1] != null ? Number.parseInt(parsedUrl[1], 10) : HTTPS_PORT;
const socketOptions = autoSelectSocketOptions(this.options.socketOptions || {});
Expand Down Expand Up @@ -329,10 +359,6 @@ export class StateMachine {
}
}

function ontimeout() {
return new MongoCryptError('KMS request timed out');
}

function onerror(cause: Error) {
return new MongoCryptError('KMS request failed', { cause });
}
Expand Down Expand Up @@ -364,7 +390,6 @@ export class StateMachine {
resolve: resolveOnNetSocketConnect
} = promiseWithResolvers<void>();
netSocket
.once('timeout', () => rejectOnNetSocketError(ontimeout()))
.once('error', err => rejectOnNetSocketError(onerror(err)))
.once('close', () => rejectOnNetSocketError(onclose()))
.once('connect', () => resolveOnNetSocketConnect());
Expand Down Expand Up @@ -410,8 +435,8 @@ export class StateMachine {
reject: rejectOnTlsSocketError,
resolve
} = promiseWithResolvers<void>();

socket
.once('timeout', () => rejectOnTlsSocketError(ontimeout()))
.once('error', err => rejectOnTlsSocketError(onerror(err)))
.once('close', () => rejectOnTlsSocketError(onclose()))
.on('data', data => {
Expand All @@ -425,20 +450,27 @@ export class StateMachine {
resolve();
}
});
await willResolveKmsRequest;

await (typeof timeoutMS === 'number'
? Promise.all([willResolveKmsRequest, Timeout.expires(timeoutMS)])
: willResolveKmsRequest);
} catch (error) {
if (error instanceof TimeoutError)
throw new MongoOperationTimeoutError('KMS request timed out');
throw error;
} finally {
// There's no need for any more activity on this socket at this point.
destroySockets();
}
}

*requests(context: MongoCryptContext) {
*requests(context: MongoCryptContext, timeoutMS?: number | null) {
for (
let request = context.nextKMSRequest();
request != null;
request = context.nextKMSRequest()
) {
yield this.kmsRequest(request);
yield this.kmsRequest(request, timeoutMS);
}
}

Expand Down Expand Up @@ -498,15 +530,19 @@ export class StateMachine {
async fetchCollectionInfo(
client: MongoClient,
ns: string,
filter: Document
filter: Document,
timeoutMS?: number | null
): Promise<Uint8Array | null> {
const { db } = MongoDBCollectionNamespace.fromString(ns);

const collections = await client
.db(db)
.listCollections(filter, {
promoteLongs: false,
promoteValues: false
promoteValues: false,
...(typeof timeoutMS === 'number'
? { timeoutMS, timeoutMode: 'cursorLifetime' }
: undefined)
})
.toArray();

Expand All @@ -522,12 +558,20 @@ export class StateMachine {
* @param command - The command to execute.
* @param callback - Invoked with the serialized and marked bson command, or with an error
*/
async markCommand(client: MongoClient, ns: string, command: Uint8Array): Promise<Uint8Array> {
const options = { promoteLongs: false, promoteValues: false };
async markCommand(
client: MongoClient,
ns: string,
command: Uint8Array,
timeoutMS?: number | null
): Promise<Uint8Array> {
const { db } = MongoDBCollectionNamespace.fromString(ns);
const rawCommand = deserialize(command, options);
const bsonOptions = { promoteLongs: false, promoteValues: false };
const rawCommand = deserialize(command, bsonOptions);

const response = await client.db(db).command(rawCommand, options);
const response = await client.db(db).command(rawCommand, {
...bsonOptions,
...(typeof timeoutMS === 'number' ? { timeoutMS, omitMaxTimeMS: true } : undefined)
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
});

return serialize(response, this.bsonOptions);
}
Expand All @@ -543,15 +587,16 @@ export class StateMachine {
fetchKeys(
client: MongoClient,
keyVaultNamespace: string,
filter: Uint8Array
filter: Uint8Array,
timeoutMS?: number | null
): Promise<Array<DataKey>> {
const { db: dbName, collection: collectionName } =
MongoDBCollectionNamespace.fromString(keyVaultNamespace);

return client
.db(dbName)
.collection<DataKey>(collectionName, { readConcern: { level: 'majority' } })
.find(deserialize(filter))
.find(deserialize(filter), { timeoutMS: timeoutMS != null ? timeoutMS : undefined })
.toArray();
}
}
Loading