Skip to content

Commit 472dce0

Browse files
Added options to fine-tune settings for bulk operations (#43509)
* Added option to override configuration for `minTargetBulkBatchSize´ * Added changelogs * Update PartitionScopeThresholds.java
1 parent f908324 commit 472dce0

File tree

13 files changed

+141
-32
lines changed

13 files changed

+141
-32
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
1213

1314
### 4.35.0 (2024-11-27)
1415

sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
1213

1314
### 4.35.0 (2024-11-27)
1415

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
1213

1314
### 4.35.0 (2024-11-27)
1415

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
1213

1314
### 4.35.0 (2024-11-27)
1415

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
1213

1314
### 4.35.0 (2024-11-27)
1415

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ private[spark] object CosmosConfigNames {
8989
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
9090
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
9191
val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize"
92+
val WriteBulkMinTargetBatchSize = "spark.cosmos.write.bulk.minTargetBatchSize"
9293
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
9394
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
9495
val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize"
@@ -195,6 +196,7 @@ private[spark] object CosmosConfigNames {
195196
WriteBulkPayloadSizeInBytes,
196197
WriteBulkInitialBatchSize,
197198
WriteBulkMaxBatchSize,
199+
WriteBulkMinTargetBatchSize,
198200
WritePointMaxConcurrency,
199201
WritePatchDefaultOperationType,
200202
WritePatchColumnConfigs,
@@ -1162,6 +1164,7 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
11621164
maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
11631165
initialMicroBatchSize: Option[Int] = None,
11641166
maxMicroBatchSize: Option[Int] = None,
1167+
minTargetMicroBatchSize: Option[Int] = None,
11651168
flushCloseIntervalInSeconds: Int = 60,
11661169
maxNoProgressIntervalInSeconds: Int = 180,
11671170
maxRetryNoProgressIntervalInSeconds: Int = 45 * 60,
@@ -1207,6 +1210,15 @@ private object CosmosWriteConfig {
12071210
"too many RUs and you cannot enable thoughput control. NOTE: using throuhgput control is preferred and will." +
12081211
"result in better throughput while still limiting the RU/s used.")
12091212

1213+
private val minTargetMicroBatchSize = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMinTargetBatchSize,
1214+
defaultValue = Option.apply(Configs.getMinTargetBulkMicroBatchSize),
1215+
mandatory = false,
1216+
parseFromStringFunction = minTargetBatchSizeString => Math.min(minTargetBatchSizeString.toInt, Configs.getMinTargetBulkMicroBatchSize),
1217+
helpMessage = "Cosmos DB min. target bulk micro batch size - a micro batch will be flushed to the backend " +
1218+
"when the number of documents enqueued exceeds the target micro batch size. The target micro batch size is " +
1219+
"calculated based on the throttling rate. This setting can be used to force the target batch size to have " +
1220+
" at least a certain size. NOTE: This should only be modified in rare edge cases.")
1221+
12101222
private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations,
12111223
mandatory = false,
12121224
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
@@ -1445,6 +1457,7 @@ private object CosmosWriteConfig {
14451457
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
14461458
val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)
14471459
val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize)
1460+
val minTargetBatchSizeOpt = CosmosConfigEntry.parse(cfg, minTargetMicroBatchSize)
14481461
val writeRetryCommitInterceptor = CosmosConfigEntry
14491462
.parse(cfg, writeOnRetryCommitInterceptor).flatten
14501463

@@ -1477,6 +1490,7 @@ private object CosmosWriteConfig {
14771490
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt,
14781491
initialMicroBatchSize = initialBatchSizeOpt,
14791492
maxMicroBatchSize = maxBatchSizeOpt,
1493+
minTargetMicroBatchSize = minTargetBatchSizeOpt,
14801494
flushCloseIntervalInSeconds = CosmosConfigEntry.parse(cfg, flushCloseIntervalInSeconds).get,
14811495
maxNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxNoProgressIntervalInSeconds).get,
14821496
maxRetryNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxRetryNoProgressIntervalInSeconds).get,

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,20 @@ class SparkE2EWriteITest
3434
itemWriteStrategy: ItemWriteStrategy,
3535
hasId: Boolean = true,
3636
initialBatchSize: Option[Int] = None,
37-
maxBatchSize: Option[Int] = None)
37+
maxBatchSize: Option[Int] = None,
38+
minTargetBatchSize: Option[Int] = None)
3839

3940
private val upsertParameterTest = Seq(
40-
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None),
41-
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = None),
42-
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5)),
43-
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None),
44-
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None, maxBatchSize = None)
41+
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None, minTargetBatchSize = None),
42+
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = None, minTargetBatchSize = None),
43+
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5), minTargetBatchSize = None),
44+
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5), minTargetBatchSize = Some(2)),
45+
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None, minTargetBatchSize = None),
46+
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None, maxBatchSize = None, minTargetBatchSize = None)
4547
)
4648

47-
for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize, maxBatchSize) <- upsertParameterTest) {
48-
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize, maxBatchSize = $maxBatchSize" in {
49+
for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize, maxBatchSize, minTargetBatchSize) <- upsertParameterTest) {
50+
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize, maxBatchSize = $maxBatchSize, minTargetBatchSize = $minTargetBatchSize" in {
4951
val cosmosEndpoint = TestConfigurations.HOST
5052
val cosmosMasterKey = TestConfigurations.MASTER_KEY
5153

@@ -90,6 +92,18 @@ class SparkE2EWriteITest
9092
case None =>
9193
}
9294

95+
minTargetBatchSize match {
96+
case Some(customMinTargetBatchSize) =>
97+
configMapBuilder += (
98+
"spark.cosmos.write.bulk.minTargetBatchSize" -> customMinTargetBatchSize.toString,
99+
)
100+
101+
configOverrideMapBuilder += (
102+
"spark.cosmos.write.bulk.minTargetBatchSize" -> customMinTargetBatchSize.toString,
103+
)
104+
case None =>
105+
}
106+
93107
val cfg = configMapBuilder.toMap
94108

95109
val cfgOverwrite = configOverrideMapBuilder.toMap

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholdsTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void neverThrottledShouldResultInMaxBatchSize() {
4141
}
4242

4343
@Test(groups = { "unit" })
44-
public void alwaysThrottledShouldResultInBatSizeOfOne() {
44+
public void alwaysThrottledShouldResultInBatchSizeOfOne() {
4545
String pkRangeId = UUID.randomUUID().toString();
4646
PartitionScopeThresholds thresholds =
4747
new PartitionScopeThresholds(pkRangeId, new CosmosBulkExecutionOptionsImpl());
@@ -71,5 +71,13 @@ public void initialTargetMicroBatchSize() {
7171
bulkOperations.setMaxMicroBatchSize(maxBatchSize);
7272
thresholds = new PartitionScopeThresholds(pkRangeId, bulkOperations);
7373
assertThat(thresholds.getTargetMicroBatchSizeSnapshot()).isEqualTo(maxBatchSize);
74+
75+
// initial targetBatchSize should be at least by minTargetBatchSize
76+
int minTargetBatchSize = 5;
77+
bulkOperations = new CosmosBulkExecutionOptionsImpl();
78+
bulkOperations.setInitialMicroBatchSize(1);
79+
bulkOperations.setMinTargetMicroBatchSize(minTargetBatchSize);
80+
thresholds = new PartitionScopeThresholds(pkRangeId, bulkOperations);
81+
assertThat(thresholds.getTargetMicroBatchSizeSnapshot()).isEqualTo(minTargetBatchSize);
7482
}
7583
}

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
* Added support to enable http2 for gateway mode with system property `COSMOS.HTTP2_ENABLED` and system variable `COSMOS_HTTP2_ENABLED`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947)
1313
* Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MAX_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MAX_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947)
1414
* Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MIN_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MIN_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947)
15-
* Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MAX_CONCURRENT_STREAMS` and system variable `COSMOS_HTTP2_MAX_CONCURRENT_STREAMS`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947)
16-
15+
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)
16+
1717
### 4.65.0 (2024-11-19)
1818

1919
#### Features Added

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,19 @@ public class Configs {
208208
public static final String PREVENT_INVALID_ID_CHARS_VARIABLE = "COSMOS_PREVENT_INVALID_ID_CHARS";
209209
public static final boolean DEFAULT_PREVENT_INVALID_ID_CHARS = false;
210210

211+
// Bulk default settings
212+
public static final String MIN_TARGET_BULK_MICRO_BATCH_SIZE = "COSMOS.MIN_TARGET_BULK_MICRO_BATCH_SIZE";
213+
public static final String MIN_TARGET_BULK_MICRO_BATCH_SIZE_VARIABLE = "COSMOS_MIN_TARGET_BULK_MICRO_BATCH_SIZE";
214+
public static final int DEFAULT_MIN_TARGET_BULK_MICRO_BATCH_SIZE = 1;
215+
216+
public static final String MAX_BULK_MICRO_BATCH_CONCURRENCY = "COSMOS.MAX_BULK_MICRO_BATCH_CONCURRENCY";
217+
public static final String MAX_BULK_MICRO_BATCH_CONCURRENCY_VARIABLE = "COSMOS_MAX_BULK_MICRO_BATCH_CONCURRENCY";
218+
public static final int DEFAULT_MAX_BULK_MICRO_BATCH_CONCURRENCY = 1;
219+
220+
public static final String MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS = "COSMOS.MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS";
221+
public static final String MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS_VARIABLE = "COSMOS_MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS";
222+
public static final int DEFAULT_MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS = 1000;
223+
211224
// Config of CodingErrorAction on charset decoder for malformed input
212225
public static final String CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT = "COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT";
213226
public static final String DEFAULT_CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT = StringUtils.EMPTY;
@@ -495,6 +508,48 @@ public static boolean isIdValueValidationEnabled() {
495508
return DEFAULT_PREVENT_INVALID_ID_CHARS;
496509
}
497510

511+
public static int getMinTargetBulkMicroBatchSize() {
512+
String valueFromSystemProperty = System.getProperty(MIN_TARGET_BULK_MICRO_BATCH_SIZE);
513+
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
514+
return Integer.parseInt(valueFromSystemProperty);
515+
}
516+
517+
String valueFromEnvVariable = System.getenv(MIN_TARGET_BULK_MICRO_BATCH_SIZE_VARIABLE);
518+
if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) {
519+
return Integer.parseInt(valueFromEnvVariable);
520+
}
521+
522+
return DEFAULT_MIN_TARGET_BULK_MICRO_BATCH_SIZE;
523+
}
524+
525+
public static int getMaxBulkMicroBatchConcurrency() {
526+
String valueFromSystemProperty = System.getProperty(MAX_BULK_MICRO_BATCH_CONCURRENCY);
527+
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
528+
return Integer.parseInt(valueFromSystemProperty);
529+
}
530+
531+
String valueFromEnvVariable = System.getenv(MAX_BULK_MICRO_BATCH_CONCURRENCY_VARIABLE);
532+
if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) {
533+
return Integer.parseInt(valueFromEnvVariable);
534+
}
535+
536+
return DEFAULT_MAX_BULK_MICRO_BATCH_CONCURRENCY;
537+
}
538+
539+
public static int getMaxBulkMicroBatchFlushIntervalInMs() {
540+
String valueFromSystemProperty = System.getProperty(MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS);
541+
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
542+
return Integer.parseInt(valueFromSystemProperty);
543+
}
544+
545+
String valueFromEnvVariable = System.getenv(MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS_VARIABLE);
546+
if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) {
547+
return Integer.parseInt(valueFromEnvVariable);
548+
}
549+
550+
return DEFAULT_MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS;
551+
}
552+
498553
public static int getMaxHttpRequestTimeout() {
499554
String valueFromSystemProperty = System.getProperty(HTTP_MAX_REQUEST_TIMEOUT);
500555
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {

0 commit comments

Comments
 (0)