Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6338): implement client bulk write error handling #4262

Merged
merged 29 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,8 @@ export class ClientBulkWriteCursorResponse extends CursorResponse {
get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}

get writeConcernError() {
return this.get('writeConcernError', BSONType.object, false);
}
}
10 changes: 2 additions & 8 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { type Document } from 'bson';

import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoClientBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
Expand Down Expand Up @@ -48,16 +47,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
* We need a way to get the top level cursor response fields for
* generating the bulk write result, so we expose this here.
*/
get response(): ClientBulkWriteCursorResponse {
get response(): ClientBulkWriteCursorResponse | null {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoClientBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
return null;
}

/**
* Get the last set of operations the cursor executed.
*/
get operations(): Document[] {
return this.commandBuilder.lastOperations;
}
Expand Down
46 changes: 44 additions & 2 deletions src/error.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import type { Document } from './bson';
import {
type ClientBulkWriteError,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import type { ServerType } from './sdam/common';
import type { TopologyVersion } from './sdam/server_description';
import type { TopologyDescription } from './sdam/topology_description';
Expand Down Expand Up @@ -616,6 +620,44 @@ export class MongoGCPError extends MongoOIDCError {
}
}

/**
* An error indicating that an error occurred when executing the bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteError extends MongoServerError {
/**
* Write concern errors that occurred while executing the bulk write. This list may have
* multiple items if more than one server command was required to execute the bulk write.
*/
writeConcernErrors: Document[];
/**
* Errors that occurred during the execution of individual write operations. This map will
* contain at most one entry if the bulk write was ordered.
*/
writeErrors: Map<number, ClientBulkWriteError>;
/**
* The results of any successful operations that were performed before the error was
* encountered.
*/
partialResult?: ClientBulkWriteResult;

/**
* Initialize the client bulk write error.
* @param message - The error message.
*/
constructor(message: ErrorDescription) {
super(message);
this.writeConcernErrors = [];
this.writeErrors = new Map();
}

override get name(): string {
return 'MongoClientBulkWriteError';
}
}

/**
* An error indicating that an error occurred when processing bulk write results.
*
Expand Down Expand Up @@ -1047,8 +1089,8 @@ export class MongoInvalidArgumentError extends MongoAPIError {
*
* @public
**/
constructor(message: string) {
super(message);
constructor(message: string, options?: { cause?: Error }) {
super(message, options);
}

override get name(): string {
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export {
MongoBatchReExecutionError,
MongoChangeStreamError,
MongoClientBulkWriteCursorError,
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoCompatibilityError,
MongoCursorExhaustedError,
Expand Down Expand Up @@ -477,6 +478,7 @@ export type {
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteError,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
Expand Down
5 changes: 5 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
if (this.autoEncrypter) {
throw new MongoInvalidArgumentError(
'MongoClient bulkWrite does not currently support automatic encryption.'
);
}
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

Expand Down
46 changes: 36 additions & 10 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
this.ns = new MongoDBNamespace('admin', '$cmd');
}

override resetBatch(): boolean {
return this.commandBuilder.resetBatch();
}

override get canRetryWrite(): boolean {
return this.commandBuilder.isBatchRetryable;
}

/**
* Execute the command. Superclass will handle write concern, etc.
* @param server - The server.
Expand All @@ -41,14 +49,20 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu

if (server.description.type === ServerType.LoadBalancer) {
if (session) {
// Checkout a connection to build the command.
const connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
let connection;
if (!session.pinnedConnection) {
// Checkout a connection to build the command.
connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
} else {
connection = session.pinnedConnection;
}
command = this.commandBuilder.buildBatch(
connection.hello?.maxMessageSizeBytes,
connection.hello?.maxWriteBatchSize
connection.hello?.maxWriteBatchSize,
connection.hello?.maxBsonObjectSize
);
} else {
throw new MongoClientBulkWriteExecutionError(
Expand All @@ -59,16 +73,26 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
// At this point we have a server and the auto connect code has already
// run in executeOperation, so the server description will be populated.
// We can use that to build the command.
if (!server.description.maxWriteBatchSize || !server.description.maxMessageSizeBytes) {
if (
!server.description.maxWriteBatchSize ||
!server.description.maxMessageSizeBytes ||
!server.description.maxBsonObjectSize
) {
throw new MongoClientBulkWriteExecutionError(
'In order to execute a client bulk write, both maxWriteBatchSize and maxMessageSizeBytes must be provided by the servers hello response.'
'In order to execute a client bulk write, both maxWriteBatchSize, maxMessageSizeBytes and maxBsonObjectSize must be provided by the servers hello response.'
);
}
command = this.commandBuilder.buildBatch(
server.description.maxMessageSizeBytes,
server.description.maxWriteBatchSize
server.description.maxWriteBatchSize,
server.description.maxBsonObjectSize
);
}

// Check after the batch is built if we cannot retry it and override the option.
if (!this.canRetryWrite) {
this.options.willRetryWrite = false;
}
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
}
}
Expand All @@ -77,5 +101,7 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
defineAspects(ClientBulkWriteOperation, [
Aspect.WRITE_OPERATION,
Aspect.SKIP_COLLATION,
Aspect.CURSOR_CREATING
Aspect.CURSOR_CREATING,
Aspect.RETRYABLE,
Aspect.COMMAND_BATCHING
]);
89 changes: 84 additions & 5 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { BSON, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { MongoAPIError, MongoInvalidArgumentError } from '../../error';
import { type PkFactory } from '../../mongo_client';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import { DEFAULT_PK_FACTORY } from '../../utils';
import { DEFAULT_PK_FACTORY, hasAtomicOperators } from '../../utils';
import { type CollationOptions } from '../command';
import { type Hint } from '../operation';
import type {
Expand Down Expand Up @@ -38,8 +39,14 @@ export class ClientBulkWriteCommandBuilder {
models: AnyClientBulkWriteModel[];
options: ClientBulkWriteOptions;
pkFactory: PkFactory;
/** The current index in the models array that is being processed. */
currentModelIndex: number;
/** The model index that the builder was on when it finished the previous batch. Used for resets when retrying. */
previousModelIndex: number;
/** The last array of operations that were created. Used by the results merger for indexing results. */
lastOperations: Document[];
/** Returns true if the current batch being created has no multi-updates. */
isBatchRetryable: boolean;

/**
* Create the command builder.
Expand All @@ -54,7 +61,9 @@ export class ClientBulkWriteCommandBuilder {
this.options = options;
this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY;
this.currentModelIndex = 0;
this.previousModelIndex = 0;
this.lastOperations = [];
this.isBatchRetryable = true;
}

/**
Expand All @@ -76,27 +85,57 @@ export class ClientBulkWriteCommandBuilder {
return this.currentModelIndex < this.models.length;
}

/**
* When we need to retry a command we need to set the current
* model index back to its previous value.
*/
resetBatch(): boolean {
this.currentModelIndex = this.previousModelIndex;
return true;
}

/**
* Build a single batch of a client bulk write command.
* @param maxMessageSizeBytes - The max message size in bytes.
* @param maxWriteBatchSize - The max write batch size.
* @returns The client bulk write command.
*/
buildBatch(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand {
buildBatch(
maxMessageSizeBytes: number,
maxWriteBatchSize: number,
maxBsonObjectSize: number
): ClientBulkWriteCommand {
// We start by assuming the batch has no multi-updates, so it is retryable
// until we find them.
this.isBatchRetryable = true;
let commandLength = 0;
let currentNamespaceIndex = 0;
const command: ClientBulkWriteCommand = this.baseCommand();
const namespaces = new Map<string, number>();
// In the case of retries we need to mark where we started this batch.
this.previousModelIndex = this.currentModelIndex;

while (this.currentModelIndex < this.models.length) {
const model = this.models[this.currentModelIndex];
const ns = model.namespace;
const nsIndex = namespaces.get(ns);

// Multi updates are not retryable.
if (model.name === 'deleteMany' || model.name === 'updateMany') {
this.isBatchRetryable = false;
}

if (nsIndex != null) {
// Build the operation and serialize it to get the bytes buffer.
const operation = buildOperation(model, nsIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);
let operationBuffer;
try {
operationBuffer = BSON.serialize(operation);
} catch (cause) {
throw new MongoInvalidArgumentError(`Could not serialize operation to BSON`, { cause });
}

validateBufferSize('ops', operationBuffer, maxBsonObjectSize);

// Check if the operation buffer can fit in the command. If it can,
// then add the operation to the document sequence and increment the
Expand All @@ -119,9 +158,18 @@ export class ClientBulkWriteCommandBuilder {
// construct our nsInfo and ops documents and buffers.
namespaces.set(ns, currentNamespaceIndex);
const nsInfo = { ns: ns };
const nsInfoBuffer = BSON.serialize(nsInfo);
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);
let nsInfoBuffer;
let operationBuffer;
try {
nsInfoBuffer = BSON.serialize(nsInfo);
operationBuffer = BSON.serialize(operation);
} catch (cause) {
throw new MongoInvalidArgumentError(`Could not serialize ns info to BSON`, { cause });
}

validateBufferSize('nsInfo', nsInfoBuffer, maxBsonObjectSize);
validateBufferSize('ops', operationBuffer, maxBsonObjectSize);

// Check if the operation and nsInfo buffers can fit in the command. If they
// can, then add the operation and nsInfo to their respective document
Expand Down Expand Up @@ -179,6 +227,14 @@ export class ClientBulkWriteCommandBuilder {
}
}

function validateBufferSize(name: string, buffer: Uint8Array, maxBsonObjectSize: number) {
if (buffer.length > maxBsonObjectSize) {
throw new MongoInvalidArgumentError(
`Client bulk write operation ${name} of length ${buffer.length} exceeds the max bson object size of ${maxBsonObjectSize}`
);
}
}

/** @internal */
interface ClientInsertOperation {
insert: number;
Expand Down Expand Up @@ -293,6 +349,18 @@ export const buildUpdateManyOperation = (
return createUpdateOperation(model, index, true);
};

/**
* Validate the update document.
* @param update - The update document.
*/
function validateUpdate(update: Document) {
if (!hasAtomicOperators(update)) {
throw new MongoAPIError(
'Client bulk write update models must only contain atomic modifiers (start with $) and must not be empty.'
);
}
}

/**
* Creates a delete operation based on the parameters.
*/
Expand All @@ -301,6 +369,11 @@ function createUpdateOperation(
index: number,
multi: boolean
): ClientUpdateOperation {
// Update documents provided in UpdateOne and UpdateMany write models are
// required only to contain atomic modifiers (i.e. keys that start with "$").
// Drivers MUST throw an error if an update document is empty or if the
// document's first key does not start with "$".
validateUpdate(model.update);
const document: ClientUpdateOperation = {
update: index,
multi: multi,
Expand Down Expand Up @@ -343,6 +416,12 @@ export const buildReplaceOneOperation = (
model: ClientReplaceOneModel,
index: number
): ClientReplaceOneOperation => {
if (hasAtomicOperators(model.replacement)) {
throw new MongoAPIError(
'Client bulk write replace models must not contain atomic modifiers (start with $) and must not be empty.'
);
}

const document: ClientReplaceOneOperation = {
update: index,
multi: false,
Expand Down
6 changes: 6 additions & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ export interface ClientBulkWriteResult {
deleteResults?: Map<number, ClientDeleteResult>;
}

/** @public */
export interface ClientBulkWriteError {
code: number;
message: string;
}

/** @public */
export interface ClientInsertOneResult {
/**
Expand Down
Loading