Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sdk/cosmosdb/cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 3.17.3 (Unreleased)

### Features Added
- Changes in bulk api to honour size restictions (i.e 2Mb) while creating individual batches.[#23923](https://github.com/Azure/azure-sdk-for-js/issues/23923)
### Breaking Changes

### Bugs Fixed

### Other Changes
# Release History

## 3.17.2 (2022-11-15)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ export const Constants: {
AzurePackageName: string;
SDKName: string;
SDKVersion: string;
DefaultMaxBulkRequestBodySizeInBytes: number;
Quota: {
CollectionSize: string;
};
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmosdb/cosmos/src/client/Item/Items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
OperationInput,
BulkOptions,
decorateBatchOperation,
splitBatchBasedOnBodySize,
} from "../../utils/batch";
import { hashV1PartitionKey } from "../../utils/hashing/v1";
import { hashV2PartitionKey } from "../../utils/hashing/v2";
Expand Down Expand Up @@ -438,6 +439,7 @@ export class Items {
await Promise.all(
batches
.filter((batch: Batch) => batch.operations.length)
.flatMap((batch: Batch) => splitBatchBasedOnBodySize(batch))
.map(async (batch: Batch) => {
if (batch.operations.length > 100) {
throw new Error("Cannot run bulk request with more than 100 operations per partition");
Expand Down
3 changes: 3 additions & 0 deletions sdk/cosmosdb/cosmos/src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ export const Constants = {
SDKName: "azure-cosmos-js",
SDKVersion: "3.17.2",

// Bulk Operations
DefaultMaxBulkRequestBodySizeInBytes: 220201,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this specific size?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have taken this constant from Java and .Net SDK. this is slightly lower than 2Mb but this seems to the value used by both SDKs.


Quota: {
CollectionSize: "collectionSize",
},
Expand Down
49 changes: 49 additions & 0 deletions sdk/cosmosdb/cosmos/src/utils/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { PartitionKeyDefinition } from "../documents";
import { RequestOptions } from "..";
import { PatchRequestBody } from "./patch";
import { v4 } from "uuid";
import { bodyFromData } from "../request/request";
import { Constants } from "../common/constants";
const uuid = v4;

export type Operation =
Expand Down Expand Up @@ -210,6 +212,53 @@ export function decorateOperation(
return operation as Operation;
}

/**
* Splits a batch into array of batches based on cumulative size of its operations by making sure
* cumulative size of an individual batch is not larger than {@link Constants.DefaultMaxBulkRequestBodySizeInBytes}.
* If a single operation itself is larger than {@link Constants.DefaultMaxBulkRequestBodySizeInBytes}, that
* operation would be moved into a batch containing only that operation.
* @param originalBatch - A batch of operations needed to be checked.
* @returns
* @hidden
*/
export function splitBatchBasedOnBodySize(originalBatch: Batch): Batch[] {
if (originalBatch?.operations === undefined || originalBatch.operations.length < 1) return [];
let currentBatchSize = calculateObjectSizeInBytes(originalBatch.operations[0]);
let currentBatch: Batch = {
...originalBatch,
operations: [originalBatch.operations[0]],
indexes: [originalBatch.indexes[0]],
};
const processedBatches: Batch[] = [];
processedBatches.push(currentBatch);

for (let index = 1; index < originalBatch.operations.length; index++) {
const operation = originalBatch.operations[index];
const currentOpSize = calculateObjectSizeInBytes(operation);
if (currentBatchSize + currentOpSize > Constants.DefaultMaxBulkRequestBodySizeInBytes) {
currentBatch = {
...originalBatch,
operations: [],
indexes: [],
};
processedBatches.push(currentBatch);
currentBatchSize = 0;
}
currentBatch.operations.push(operation);
currentBatch.indexes.push(originalBatch.indexes[index]);
currentBatchSize += currentOpSize;
}
return processedBatches;
}

/**
* Calculates size of an JSON object in bytes with utf-8 encoding.
* @hidden
*/
export function calculateObjectSizeInBytes(obj: unknown): number {
return new TextEncoder().encode(bodyFromData(obj as any)).length;
Comment thread
witemple-msft marked this conversation as resolved.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leaving a note that this feels expensive. You're basically encoding the body into a buffer, which the request pipeline must do anyway, for the sole purpose of measuring the body's length. I'm not sure if there's a great alternative to doing this to be honest, but it sticks out at me as being a costly operation.

}

export function decorateBatchOperation(
operation: OperationInput,
options: RequestOptions = {}
Expand Down
138 changes: 137 additions & 1 deletion sdk/cosmosdb/cosmos/test/internal/unit/utils/batch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@
// Licensed under the MIT license.

import assert from "assert";
import { deepFind } from "../../../../src/utils/batch";
import { Constants } from "../../../../src";
import {
Batch,
BulkOperationType,
calculateObjectSizeInBytes,
deepFind,
Operation,
splitBatchBasedOnBodySize,
} from "../../../../src/utils/batch";

describe("batch utils", function () {
it("deep finds nested partition key values in objects", function () {
Expand All @@ -26,3 +34,131 @@ describe("batch utils", function () {
assert.equal(deepFind(testTwiceNested, "nested/nested2/key"), "value");
});
});

const operationSkeleton: Operation = {
operationType: BulkOperationType.Create,
resourceBody: {
value: "",
},
};

const constantSize = calculateObjectSizeInBytes(operationSkeleton);

export function generateOperationOfSize(
sizeInBytes: number,
attributes?: unknown,
partitionKey?: { [P in string]: unknown }
): Operation {
if (sizeInBytes < constantSize) {
throw new Error(`Not possible to generate operation of size less than ${constantSize}`);
}
let sizeToAdd = sizeInBytes - constantSize;
if (partitionKey !== undefined) {
sizeToAdd -= calculateObjectSizeInBytes({ partitionKey }) + calculateObjectSizeInBytes({});
}
return {
...(attributes as any),
operationType: BulkOperationType.Create,
resourceBody: {
value: new Array(sizeToAdd + 1).join("a"),
...partitionKey,
},
};
}

describe("Test batch split based on size", function () {
type BatchSplitTestCase = {
inputOperationDescription: {
operationSize: number;
index: number;
}[];
resultingBatchDescription: {
resultingBatchLength: number;
resultingOperationsLengths: number[];
};
};

function runBatchSplitTestCase(t: BatchSplitTestCase): void {
const inputBatch: Batch = {
operations: t.inputOperationDescription.map((op) =>
generateOperationOfSize(Math.floor(op.operationSize))
),
min: "",
max: "",
rangeId: "",
indexes: t.inputOperationDescription.map((op) => op.index),
};
const processedBatches: Batch[] = splitBatchBasedOnBodySize(inputBatch);
assert.strictEqual(
processedBatches.length,
t.resultingBatchDescription.resultingBatchLength,
`Should have split into ${t.resultingBatchDescription.resultingBatchLength} batch.`
);
t.resultingBatchDescription.resultingOperationsLengths.forEach((op, index) =>
assert.strictEqual(
processedBatches[index].operations.length,
op,
`${index}th batch should have ${processedBatches[index].operations.length} operations.`
)
);
}

it("For An empty batch, empty batch should be returned", function () {
runBatchSplitTestCase({
inputOperationDescription: [],
resultingBatchDescription: {
resultingBatchLength: 0,
resultingOperationsLengths: [],
},
});
});

it("If all operations are cumulatively less than DefaultMaxBulkRequestBodySizeInBytes, Batch should not split", function () {
runBatchSplitTestCase({
inputOperationDescription: [...Array(20).keys()].map((index) => ({
operationSize: Constants.DefaultMaxBulkRequestBodySizeInBytes / 100,
index: index,
})),
resultingBatchDescription: {
resultingBatchLength: 1,
resultingOperationsLengths: [20],
},
});
});
it("20 operations with each 1/2 size of DefaultMaxBulkRequestBodySizeInBytes, should split in 10 batches.", function () {
runBatchSplitTestCase({
inputOperationDescription: [...Array(20).keys()].map((index) => ({
operationSize: Constants.DefaultMaxBulkRequestBodySizeInBytes / 2,
index: index,
})),
resultingBatchDescription: {
resultingBatchLength: 10,
resultingOperationsLengths: [...Array(10).keys()].map(() => 2),
},
});
});
it("20 operations with each 1/3 size of DefaultMaxBulkRequestBodySizeInBytes, should split in 6 batches.", function () {
runBatchSplitTestCase({
inputOperationDescription: [...Array(20).keys()].map((index) => ({
operationSize: Constants.DefaultMaxBulkRequestBodySizeInBytes / 3,
index: index,
})),
resultingBatchDescription: {
resultingBatchLength: 7,
resultingOperationsLengths: [...[...Array(6).keys()].map(() => 3), 2],
},
});
});
it("If an single operation is bigger than DefaultMaxBulkRequestBodySizeInBytes, it should be part of a Batch containing only that operation.", function () {
runBatchSplitTestCase({
inputOperationDescription: [...Array(2).keys()].map((index) => ({
operationSize: Constants.DefaultMaxBulkRequestBodySizeInBytes + 1,
index: index,
})),
resultingBatchDescription: {
resultingBatchLength: 2,
resultingOperationsLengths: [1, 1],
},
});
});
});
64 changes: 63 additions & 1 deletion sdk/cosmosdb/cosmos/test/public/functional/item.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import assert from "assert";
import { Suite } from "mocha";
import {
Constants,
Container,
CosmosClient,
OperationResponse,
Expand All @@ -26,7 +27,7 @@ import {
import { BulkOperationType, OperationInput } from "../../../src";
import { endpoint } from "../common/_testConfig";
import { masterKey } from "../common/_fakeTestSecrets";

import { generateOperationOfSize } from "../../internal/unit/utils/batch.spec";
interface TestItem {
id?: string;
name?: string;
Expand Down Expand Up @@ -229,6 +230,67 @@ describe("Item CRUD", function (this: Suite) {
});

describe("bulk/batch item operations", function () {
describe("Check size based splitting of batches", function () {
let container: Container;
before(async function () {
container = await getTestContainer("bulk container", undefined, {
partitionKey: {
paths: ["/key"],
version: undefined,
},
throughput: 400,
});
});
after(async () => {
await container.database.delete();
});
it("Check case when cumulative size of all operations is less than threshold", async function () {
const operations: OperationInput[] = [...Array(10).keys()].map(
() =>
({
...generateOperationOfSize(100, { partitionKey: "key_value" }, { key: "key_value" }),
} as any)
);
const response = await container.items.bulk(operations);
// Create
response.forEach((res, index) =>
assert.strictEqual(res.statusCode, 201, `Status should be 201 for operation ${index}`)
);
});
it("Check case when cumulative size of all operations is greater than threshold", async function () {
const operations: OperationInput[] = [...Array(10).keys()].map(
() =>
({
...generateOperationOfSize(
Math.floor(Constants.DefaultMaxBulkRequestBodySizeInBytes / 2)
),
partitionKey: {},
} as any)
);
const response = await container.items.bulk(operations);
// Create
response.forEach((res, index) =>
assert.strictEqual(res.statusCode, 201, `Status should be 201 for operation ${index}`)
);
});
it("Check case when cumulative size of all operations is greater than threshold", async function () {
const operations: OperationInput[] = [...Array(50).keys()].map(
() =>
({
...generateOperationOfSize(
Math.floor(Constants.DefaultMaxBulkRequestBodySizeInBytes / 2),
{},
{ key: "key_value" }
),
} as any)
);
const response = await container.items.bulk(operations);
// Create
response.forEach((res, index) =>
assert.strictEqual(res.statusCode, 201, `Status should be 201 for operation ${index}`)
);
});
});
describe("with v1 container", function () {
let container: Container;
let readItemId: string;
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/test/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"preserveConstEnums": true,
"removeComments": false,
"target": "es6",
"lib": ["es2019"],
"sourceMap": true,
"newLine": "LF",
"composite": true,
Expand Down