Skip to content

Commit

Permalink
feat(NODE-6338): implement client bulk write error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Oct 3, 2024
1 parent d56e235 commit 0a55ac2
Show file tree
Hide file tree
Showing 19 changed files with 852 additions and 180 deletions.
4 changes: 4 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,8 @@ export class ClientBulkWriteCursorResponse extends CursorResponse {
get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}

get writeConcernError() {
return this.get('writeConcernError', BSONType.object, false);
}
}
10 changes: 2 additions & 8 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { type Document } from 'bson';

import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoClientBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
Expand Down Expand Up @@ -48,16 +47,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
* We need a way to get the top level cursor response fields for
* generating the bulk write result, so we expose this here.
*/
get response(): ClientBulkWriteCursorResponse {
get response(): ClientBulkWriteCursorResponse | null {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoClientBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
return null;
}

/**
* Get the last set of operations the cursor executed.
*/
get operations(): Document[] {
return this.commandBuilder.lastOperations;
}
Expand Down
27 changes: 27 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,33 @@ export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
}
}

/**
* An error indicating that an error occurred when generating a bulk write update.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteUpdateError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoClientBulkWriteUpdateError';
}
}

/**
* An error indicating that an error occurred on the client when executing a client bulk write.
*
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export {
MongoChangeStreamError,
MongoClientBulkWriteCursorError,
MongoClientBulkWriteExecutionError,
MongoClientBulkWriteUpdateError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
5 changes: 5 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
if (this.autoEncrypter) {
throw new MongoInvalidArgumentError(
'MongoClient bulkWrite does not currently support automatic encryption.'
);
}
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

Expand Down
14 changes: 10 additions & 4 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
session.pin(connection);
command = this.commandBuilder.buildBatch(
connection.hello?.maxMessageSizeBytes,
connection.hello?.maxWriteBatchSize
connection.hello?.maxWriteBatchSize,
connection.hello?.maxBsonObjectSize
);
} else {
throw new MongoClientBulkWriteExecutionError(
Expand All @@ -59,14 +60,19 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
// At this point we have a server and the auto connect code has already
// run in executeOperation, so the server description will be populated.
// We can use that to build the command.
if (!server.description.maxWriteBatchSize || !server.description.maxMessageSizeBytes) {
if (
!server.description.maxWriteBatchSize ||
!server.description.maxMessageSizeBytes ||
!server.description.maxBsonObjectSize
) {
throw new MongoClientBulkWriteExecutionError(
'In order to execute a client bulk write, both maxWriteBatchSize and maxMessageSizeBytes must be provided by the servers hello response.'
'In order to execute a client bulk write, both maxWriteBatchSize, maxMessageSizeBytes and maxBsonObjectSize must be provided by the servers hello response.'
);
}
command = this.commandBuilder.buildBatch(
server.description.maxMessageSizeBytes,
server.description.maxWriteBatchSize
server.description.maxWriteBatchSize,
server.description.maxBsonObjectSize
);
}
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
Expand Down
94 changes: 90 additions & 4 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BSON, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { MongoClientBulkWriteUpdateError, MongoInvalidArgumentError } from '../../error';
import { type PkFactory } from '../../mongo_client';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import { DEFAULT_PK_FACTORY } from '../../utils';
Expand Down Expand Up @@ -82,7 +83,11 @@ export class ClientBulkWriteCommandBuilder {
* @param maxWriteBatchSize - The max write batch size.
* @returns The client bulk write command.
*/
buildBatch(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand {
buildBatch(
maxMessageSizeBytes: number,
maxWriteBatchSize: number,
maxBsonObjectSize: number
): ClientBulkWriteCommand {
let commandLength = 0;
let currentNamespaceIndex = 0;
const command: ClientBulkWriteCommand = this.baseCommand();
Expand All @@ -96,7 +101,16 @@ export class ClientBulkWriteCommandBuilder {
if (nsIndex != null) {
// Build the operation and serialize it to get the bytes buffer.
const operation = buildOperation(model, nsIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);
let operationBuffer;
try {
operationBuffer = BSON.serialize(operation);
} catch (error) {
throw new MongoInvalidArgumentError(
`Could not serialize operation to BSON: ${error.message}.`
);
}

validateBufferSize('ops', operationBuffer, maxBsonObjectSize, maxMessageSizeBytes);

// Check if the operation buffer can fit in the command. If it can,
// then add the operation to the document sequence and increment the
Expand All @@ -119,9 +133,20 @@ export class ClientBulkWriteCommandBuilder {
// construct our nsInfo and ops documents and buffers.
namespaces.set(ns, currentNamespaceIndex);
const nsInfo = { ns: ns };
const nsInfoBuffer = BSON.serialize(nsInfo);
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);
let nsInfoBuffer;
let operationBuffer;
try {
nsInfoBuffer = BSON.serialize(nsInfo);
operationBuffer = BSON.serialize(operation);
} catch (error) {
throw new MongoInvalidArgumentError(
`Could not serialize ns info and operation to BSON: ${error.message}.`
);
}

validateBufferSize('nsInfo', nsInfoBuffer, maxBsonObjectSize, maxMessageSizeBytes);
validateBufferSize('ops', operationBuffer, maxBsonObjectSize, maxMessageSizeBytes);

// Check if the operation and nsInfo buffers can fit in the command. If they
// can, then add the operation and nsInfo to their respective document
Expand Down Expand Up @@ -179,6 +204,25 @@ export class ClientBulkWriteCommandBuilder {
}
}

function validateBufferSize(
name: string,
buffer: Uint8Array,
maxBsonObjectSize: number,
maxMessageSizeBytes: number
) {
if (buffer.length > maxBsonObjectSize) {
throw new MongoInvalidArgumentError(
`Client bulk write operation ${name} of length ${buffer.length} exceeds the max bson object size of ${maxBsonObjectSize}`
);
}

if (buffer.length > maxMessageSizeBytes) {
throw new MongoInvalidArgumentError(
`Client bulk write operation ${name} of length ${buffer.length} exceeds the max message size size of ${maxMessageSizeBytes}`
);
}
}

/** @internal */
interface ClientInsertOperation {
insert: number;
Expand Down Expand Up @@ -293,6 +337,22 @@ export const buildUpdateManyOperation = (
return createUpdateOperation(model, index, true);
};

/**
* Validate the update document.
* @param update - The update document.
*/
function validateUpdate(update: Document) {
const keys = Object.keys(update);
if (keys.length === 0) {
throw new MongoClientBulkWriteUpdateError('Client bulk write update models may not be empty.');
}
if (!keys[0].startsWith('$')) {
throw new MongoClientBulkWriteUpdateError(
'Client bulk write update models must only contain atomic modifiers (start with $).'
);
}
}

/**
* Creates a delete operation based on the parameters.
*/
Expand All @@ -301,6 +361,22 @@ function createUpdateOperation(
index: number,
multi: boolean
): ClientUpdateOperation {
// Update documents provided in UpdateOne and UpdateMany write models are
// required only to contain atomic modifiers (i.e. keys that start with "$").
// Drivers MUST throw an error if an update document is empty or if the
// document's first key does not start with "$".
if (Array.isArray(model.update)) {
if (model.update.length === 0) {
throw new MongoClientBulkWriteUpdateError(
'Client bulk write update model pipelines may not be empty.'
);
}
for (const update of model.update) {
validateUpdate(update);
}
} else {
validateUpdate(model.update);
}
const document: ClientUpdateOperation = {
update: index,
multi: multi,
Expand Down Expand Up @@ -343,6 +419,16 @@ export const buildReplaceOneOperation = (
model: ClientReplaceOneModel,
index: number
): ClientReplaceOneOperation => {
const keys = Object.keys(model.replacement);
if (keys.length === 0) {
throw new MongoClientBulkWriteUpdateError('Client bulk write replace models may not be empty.');
}
if (keys[0].startsWith('$')) {
throw new MongoClientBulkWriteUpdateError(
'Client bulk write replace models must not contain atomic modifiers (start with $).'
);
}

const document: ClientReplaceOneOperation = {
update: index,
multi: false,
Expand Down
50 changes: 50 additions & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type Document } from '../../bson';
import { type ErrorDescription, type MongoRuntimeError, MongoServerError } from '../../error';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../../operations/command';
import type { Hint } from '../../operations/operation';
Expand Down Expand Up @@ -181,6 +182,55 @@ export interface ClientBulkWriteResult {
deleteResults?: Map<number, ClientDeleteResult>;
}

export interface ClientBulkWriteError {
code: number;
message: string;
}

/**
* An error indicating that an error occurred when executing the bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteError extends MongoServerError {
/**
* A top-level error that occurred when attempting to communicate with the server or execute
* the bulk write. This value may not be populated if the exception was thrown due to errors
* occurring on individual writes.
*/
error?: MongoRuntimeError;
/**
* Write concern errors that occurred while executing the bulk write. This list may have
* multiple items if more than one server command was required to execute the bulk write.
*/
writeConcernErrors: Document[];
/**
* Errors that occurred during the execution of individual write operations. This map will
* contain at most one entry if the bulk write was ordered.
*/
writeErrors: Map<number, ClientBulkWriteError>;
/**
* The results of any successful operations that were performed before the error was
* encountered.
*/
partialResult?: ClientBulkWriteResult;

/**
* Initialize the client bulk write error.
* @param message - The error message.
*/
constructor(message: ErrorDescription) {
super(message);
this.writeConcernErrors = [];
this.writeErrors = new Map();
}

override get name(): string {
return 'MongoClientBulkWriteError';
}
}

/** @public */
export interface ClientInsertOneResult {
/**
Expand Down
Loading

0 comments on commit 0a55ac2

Please sign in to comment.