Skip to content

Commit

Permalink
Basic implementation done, need to UTR changes + sync spec tests still
Browse files Browse the repository at this point in the history
synced new test files

added support for error response

added api docs

made MongoServerError.errorResponse required + casted resulting type errors

test(NODE-5992): fix env var restoration in tests (#4017)

refactor(NODE-5903): add newline to stdio logging (#4018)

fix(NODE-5985): throw Nodejs' certificate expired error when TLS fails to connect instead of `CERT_HAS_EXPIRED` (#4014)

test(NODE-5962): gossip cluster time in utr (#4019)

chore(NODE-5997): update saslprep to ^1.1.5 (#4023)

feat(NODE-5968): container and Kubernetes awareness in client metadata (#4005)

fix(NODE-5993): memory leak in the `Connection` class (#4022)

added TODO(NODE-XXXX)
  • Loading branch information
aditi-khare-mongoDB committed Mar 8, 2024
1 parent eab8f23 commit fd31d37
Show file tree
Hide file tree
Showing 51 changed files with 1,703 additions and 145 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"email": "[email protected]"
},
"dependencies": {
"@mongodb-js/saslprep": "^1.1.0",
"@mongodb-js/saslprep": "^1.1.5",
"bson": "^6.4.0",
"mongodb-connection-string-url": "^3.0.0"
},
Expand Down
11 changes: 3 additions & 8 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
type ConnectionOptions,
CryptoConnection
} from './connection';
import type { ClientMetadata } from './handshake/client_metadata';
import {
MAX_SUPPORTED_SERVER_VERSION,
MAX_SUPPORTED_WIRE_VERSION,
Expand Down Expand Up @@ -183,7 +182,7 @@ export interface HandshakeDocument extends Document {
ismaster?: boolean;
hello?: boolean;
helloOk?: boolean;
client: ClientMetadata;
client: Document;
compression: string[];
saslSupportedMechs?: string;
loadBalanced?: boolean;
Expand All @@ -200,11 +199,12 @@ export async function prepareHandshakeDocument(
const options = authContext.options;
const compressors = options.compressors ? options.compressors : [];
const { serverApi } = authContext.connection;
const clientMetadata: Document = await options.extendedMetadata;

const handshakeDoc: HandshakeDocument = {
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
helloOk: true,
client: options.metadata,
client: clientMetadata,
compression: compressors
};

Expand Down Expand Up @@ -319,7 +319,6 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
const useTLS = options.tls ?? false;
const noDelay = options.noDelay ?? true;
const connectTimeoutMS = options.connectTimeoutMS ?? 30000;
const rejectUnauthorized = options.rejectUnauthorized ?? true;
const existingSocket = options.existingSocket;

let socket: Stream;
Expand Down Expand Up @@ -375,10 +374,6 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
return socket;
} catch (error) {
socket.destroy();
if ('authorizationError' in socket && socket.authorizationError != null && rejectUnauthorized) {
// TODO(NODE-5192): wrap this with a MongoError subclass
throw socket.authorizationError;
}
throw error;
} finally {
socket.setTimeout(0);
Expand Down
70 changes: 27 additions & 43 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { type Readable, Transform, type TransformCallback } from 'stream';
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

import type { BSONSerializeOptions, Document, ObjectId } from '../bson';
import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter';
Expand Down Expand Up @@ -37,7 +36,7 @@ import {
maxWireVersion,
type MongoDBNamespace,
now,
promiseWithResolvers,
once,
uuidV4
} from '../utils';
import type { WriteConcern } from '../write_concern';
Expand Down Expand Up @@ -119,6 +118,8 @@ export interface ConnectionOptions
cancellationToken?: CancellationToken;
metadata: ClientMetadata;
/** @internal */
extendedMetadata: Promise<Document>;
/** @internal */
mongoLogger?: MongoLogger | undefined;
}

Expand Down Expand Up @@ -180,18 +181,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
* Once connection is established, command logging can log events (if enabled)
*/
public established: boolean;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
public closed = false;

private lastUseTime: number;
private clusterTime: Document | null = null;
private error: Error | null = null;
private dataEvents: AsyncGenerator<Buffer, void, void> | null = null;

private readonly socketTimeoutMS: number;
private readonly monitorCommands: boolean;
private readonly socket: Stream;
private readonly controller: AbortController;
private readonly signal: AbortSignal;
private readonly messageStream: Readable;
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
private readonly aborted: Promise<never>;

/** @event */
static readonly COMMAND_STARTED = COMMAND_STARTED;
Expand All @@ -211,6 +212,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
constructor(stream: Stream, options: ConnectionOptions) {
super();

this.socket = stream;
this.id = options.id;
this.address = streamIdentifier(stream, options);
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
Expand All @@ -223,39 +225,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.generation = options.generation;
this.lastUseTime = now();

this.socket = stream;

// TODO: Remove signal from connection layer
this.controller = new AbortController();
const { signal } = this.controller;
this.signal = signal;
const { promise: aborted, reject } = promiseWithResolvers<never>();
aborted.then(undefined, () => null); // Prevent unhandled rejection
this.signal.addEventListener(
'abort',
function onAbort() {
reject(signal.reason);
},
{ once: true }
);
this.aborted = aborted;

this.messageStream = this.socket
.on('error', this.onError.bind(this))
.pipe(new SizedMessageTransform({ connection: this }))
.on('error', this.onError.bind(this));
this.socket.on('close', this.onClose.bind(this));
this.socket.on('timeout', this.onTimeout.bind(this));

const socketWrite = promisify(this.socket.write.bind(this.socket));
this.socketWrite = async buffer => {
return Promise.race([socketWrite(buffer), this.aborted]);
};
}

/** Indicates that the connection (including underlying TCP socket) has been closed. */
public get closed(): boolean {
return this.signal.aborted;
}

public get hello() {
Expand Down Expand Up @@ -306,7 +281,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.lastUseTime = now();
}

public onError(error?: Error) {
public onError(error: Error) {
this.cleanup(error);
}

Expand Down Expand Up @@ -349,13 +324,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*
* This method does nothing if the connection is already closed.
*/
private cleanup(error?: Error): void {
private cleanup(error: Error): void {
if (this.closed) {
return;
}

this.socket.destroy();
this.controller.abort(error);
this.error = error;
this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection
this.closed = true;
this.emit(Connection.CLOSE);
}

Expand Down Expand Up @@ -596,7 +573,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

private throwIfAborted() {
this.signal.throwIfAborted();
if (this.error) throw this.error;
}

/**
Expand All @@ -619,7 +596,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const buffer = Buffer.concat(await finalCommand.toBin());

return this.socketWrite(buffer);
if (this.socket.write(buffer)) return;
return once(this.socket, 'drain');
}

/**
Expand All @@ -632,13 +610,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of onData(this.messageStream, { signal: this.signal })) {
const response = await decompressResponse(message);
yield response;
try {
this.dataEvents = onData(this.messageStream);
for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;

if (!response.moreToCome) {
return;
if (!response.moreToCome) {
return;
}
}
} finally {
this.dataEvents = null;
this.throwIfAborted();
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
minPoolSizeCheckFrequencyMS: options.minPoolSizeCheckFrequencyMS ?? 100,
autoEncrypter: options.autoEncrypter,
metadata: options.metadata
autoEncrypter: options.autoEncrypter
});

if (this.options.minPoolSize > this.options.maxPoolSize) {
Expand Down
58 changes: 54 additions & 4 deletions src/cmap/handshake/client_metadata.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { promises as fs } from 'fs';
import * as os from 'os';
import * as process from 'process';

import { BSON, Int32 } from '../../bson';
import { BSON, type Document, Int32 } from '../../bson';
import { MongoInvalidArgumentError } from '../../error';
import type { MongoOptions } from '../../mongo_client';

Expand Down Expand Up @@ -71,13 +72,13 @@ export class LimitedSizeDocument {
return true;
}

toObject(): ClientMetadata {
toObject(): Document {
return BSON.deserialize(BSON.serialize(this.document), {
promoteLongs: false,
promoteBuffers: false,
promoteValues: false,
useBigInt64: false
}) as ClientMetadata;
});
}
}

Expand Down Expand Up @@ -152,8 +153,57 @@ export function makeClientMetadata(options: MakeClientMetadataOptions): ClientMe
}
}
}
return metadataDocument.toObject() as ClientMetadata;
}

let dockerPromise: Promise<boolean>;
/** @internal */
async function getContainerMetadata() {
const containerMetadata: Record<string, any> = {};
dockerPromise ??= fs.access('/.dockerenv').then(
() => true,
() => false
);
const isDocker = await dockerPromise;

const { KUBERNETES_SERVICE_HOST = '' } = process.env;
const isKubernetes = KUBERNETES_SERVICE_HOST.length > 0 ? true : false;

if (isDocker) containerMetadata.runtime = 'docker';
if (isKubernetes) containerMetadata.orchestrator = 'kubernetes';

return containerMetadata;
}

/**
* @internal
* Re-add each metadata value.
* Attempt to add new env container metadata, but keep old data if it does not fit.
*/
export async function addContainerMetadata(originalMetadata: ClientMetadata) {
const containerMetadata = await getContainerMetadata();
if (Object.keys(containerMetadata).length === 0) return originalMetadata;

const extendedMetadata = new LimitedSizeDocument(512);

const extendedEnvMetadata = { ...originalMetadata?.env, container: containerMetadata };

for (const [key, val] of Object.entries(originalMetadata)) {
if (key !== 'env') {
extendedMetadata.ifItFitsItSits(key, val);
} else {
if (!extendedMetadata.ifItFitsItSits('env', extendedEnvMetadata)) {
// add in old data if newer / extended metadata does not fit
extendedMetadata.ifItFitsItSits('env', val);
}
}
}

if (!('env' in originalMetadata)) {
extendedMetadata.ifItFitsItSits('env', extendedEnvMetadata);
}

return metadataDocument.toObject();
return extendedMetadata.toObject();
}

/**
Expand Down
18 changes: 2 additions & 16 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ type PendingPromises = Omit<
* https://nodejs.org/api/events.html#eventsonemitter-eventname-options
*
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
* It will reject upon an error event or if the provided signal is aborted.
* It will reject upon an error event.
*/
export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
const signal = options.signal;

export function onData(emitter: EventEmitter) {
// Setup pending events and pending promise lists
/**
* When the caller has not yet called .next(), we store the
Expand Down Expand Up @@ -89,19 +87,8 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);

if (signal.aborted) {
// If the signal is aborted, set up the first .next() call to be a rejection
queueMicrotask(abortListener);
} else {
signal.addEventListener('abort', abortListener, { once: true });
}

return iterator;

function abortListener() {
errorHandler(signal.reason);
}

function eventHandler(value: Buffer) {
const promise = unconsumedPromises.shift();
if (promise != null) promise.resolve({ value, done: false });
Expand All @@ -119,7 +106,6 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal })
// Adding event handlers
emitter.off('data', eventHandler);
emitter.off('error', errorHandler);
signal.removeEventListener('abort', abortListener);
finished = true;
const doneResult = { value: undefined, done: finished } as const;

Expand Down
Loading

0 comments on commit fd31d37

Please sign in to comment.