[Kafka connector]AddSupportForThroughputBucket#48009
Conversation
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
There was a problem hiding this comment.
Pull request overview
This pull request adds support for throughput bucket feature to the Azure Cosmos Kafka connector, enabling server-side throughput control as an alternative to SDK-level throughput control. The implementation introduces a class hierarchy for throughput control configurations with CosmosThroughputControlConfig as an abstract base class, and two concrete implementations: CosmosSDKThroughputControlConfig for existing SDK-level control and CosmosServerThroughputControlConfig for the new server-side bucket control.
Changes:
- Introduced throughput bucket configuration support with validation to prevent mixing with SDK-level throughput control settings
- Refactored throughput control configuration into an inheritance hierarchy to support both SDK-level and server-side throughput control modes
- Updated connector and task implementations to properly handle both throughput control modes using instanceof checks
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| CosmosServerThroughputControlConfig.java | New class for server-side throughput bucket configuration |
| CosmosSDKThroughputControlConfig.java | New class for existing SDK-level throughput control configuration |
| CosmosThroughputControlConfig.java | Refactored to abstract base class with common properties |
| KafkaCosmosConfig.java | Added throughput bucket config parsing and validation logic |
| CosmosThroughputControlHelper.java | Updated to support both server and SDK throughput control modes |
| CosmosSourceConnector.java | Updated to use instanceof checks for SDK-specific config properties |
| CosmosSinkConnector.java | Updated to use instanceof checks for SDK-specific config properties |
| CosmosSourceTask.java | Updated to use instanceof checks for SDK-specific config properties |
| CosmosSinkTask.java | Updated to use instanceof checks for SDK-specific config properties |
| CosmosSinkConnectorTest.java | Added tests for throughput bucket configuration and validation |
| CosmosSourceConnectorTest.java | Updated existing tests to use new SDK config class; added config entry for throughput bucket |
| CHANGELOG.md | Added entry for new throughput bucket feature |
Comments suppressed due to low confidence (1)
sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java:735
- The CosmosSourceConnectorTest is missing test coverage for the new throughput bucket feature. The CosmosSinkConnectorTest includes two new test methods:
sinkConfigWithThroughputBucket(testing valid throughput bucket configuration) andinvalidThroughputBucketConfig(testing various invalid configurations). Similar test methods should be added to CosmosSourceConnectorTest to ensure consistent test coverage for both source and sink connectors.
@Test(groups = { "unit" })
public void invalidThroughputControlConfig() {
CosmosSourceConnector sourceConnector = new CosmosSourceConnector();
// invalid targetThroughput, targetThroughputThreshold, priorityLevel config and missing required config for throughput control container info
Map<String, String> sourceConfigMap = this.getValidSourceConfig();
sourceConfigMap.put("azure.cosmos.throughputControl.enabled", "true");
sourceConfigMap.put("azure.cosmos.throughputControl.targetThroughput", "-1");
sourceConfigMap.put("azure.cosmos.throughputControl.targetThroughputThreshold", "-1");
sourceConfigMap.put("azure.cosmos.throughputControl.priorityLevel", "None");
Config config = sourceConnector.validate(sourceConfigMap);
Map<String, List<String>> errorMessages = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
assertThat(errorMessages.get("azure.cosmos.throughputControl.group.name").size()).isGreaterThan(0);
assertThat(errorMessages.get("azure.cosmos.throughputControl.targetThroughput").size()).isGreaterThan(0);
assertThat(errorMessages.get("azure.cosmos.throughputControl.targetThroughputThreshold").size()).isGreaterThan(0);
assertThat(errorMessages.get("azure.cosmos.throughputControl.priorityLevel").size()).isGreaterThan(0);
assertThat(errorMessages.get("azure.cosmos.throughputControl.globalControl.database.name").size()).isGreaterThan(0);
assertThat(errorMessages.get("azure.cosmos.throughputControl.globalControl.container.name").size()).isGreaterThan(0);
// invalid throughput control account config with masterKey auth
sourceConfigMap = this.getValidSourceConfig();
sourceConfigMap.put("azure.cosmos.throughputControl.enabled", "true");
sourceConfigMap.put("azure.cosmos.throughputControl.targetThroughput", "1");
sourceConfigMap.put("azure.cosmos.throughputControl.globalControl.database.name", "ThroughputControlDatabase");
sourceConfigMap.put("azure.cosmos.throughputControl.globalControl.container.name", "ThroughputControlContainer");
sourceConfigMap.put("azure.cosmos.throughputControl.group.name", "groupName");
sourceConfigMap.put("azure.cosmos.throughputControl.account.endpoint", KafkaCosmosTestConfigurations.HOST);
config = sourceConnector.validate(sourceConfigMap);
errorMessages = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
assertThat(errorMessages.get("azure.cosmos.throughputControl.account.key").size()).isGreaterThan(0);
// targetThroughputThreshold is not supported when using add auth for throughput control
sourceConfigMap = this.getValidSourceConfig();
sourceConfigMap.put("azure.cosmos.throughputControl.enabled", "true");
sourceConfigMap.put("azure.cosmos.throughputControl.targetThroughputThreshold", "0.9");
sourceConfigMap.put("azure.cosmos.throughputControl.globalControl.database.name", "ThroughputControlDatabase");
sourceConfigMap.put("azure.cosmos.throughputControl.globalControl.container.name", "ThroughputControlContainer");
sourceConfigMap.put("azure.cosmos.throughputControl.group.name", "groupName");
sourceConfigMap.put("azure.cosmos.throughputControl.account.endpoint", KafkaCosmosTestConfigurations.HOST);
sourceConfigMap.put("azure.cosmos.throughputControl.auth.type", CosmosAuthType.SERVICE_PRINCIPAL.getName());
config = sourceConnector.validate(sourceConfigMap);
errorMessages = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
assertThat(errorMessages.get("azure.cosmos.throughputControl.auth.aad.clientId").size()).isGreaterThan(0);
assertThat(errorMessages.get("azure.cosmos.throughputControl.auth.aad.clientSecret").size()).isGreaterThan(0);
assertThat(errorMessages.get("azure.cosmos.throughputControl.account.tenantId").size()).isGreaterThan(0);
}
|
/azp run java - cosmos - kafka |
|
Azure Pipelines successfully started running 1 pipeline(s). |
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> [AutoPR azure-resourcemanager-managedops]-generated-from-SDK Generation - Java-5788711 (Azure#47788) * Configurations: 'specification/managedoperations/ManagedOps.Management/tspconfig.yaml', API Version: 2025-07-28-preview, SDK Release Type: beta, and CommitSHA: '83408dfe4894a9b5a5d3989023647bce792efc5f' in SpecRepo: 'https://github.com/Azure/azure-rest-api-specs' Pipeline run: https://dev.azure.com/azure-sdk/internal/_build/results?buildId=5788711 Refer to https://eng.ms/docs/products/azure-developer-experience/develop/sdk-release/sdk-release-prerequisites to prepare for SDK release. * Configurations: 'specification/managedoperations/ManagedOps.Management/tspconfig.yaml', API Version: 2025-07-28-preview, SDK Release Type: beta, and CommitSHA: 'ed16e10caee8ef2ab09f321272877d0efbec0d1e' in SpecRepo: 'https://github.com/Azure/azure-rest-api-specs' Pipeline run: https://dev.azure.com/azure-sdk/internal/_build/results?buildId=5863942 Refer to https://eng.ms/docs/products/azure-developer-experience/develop/sdk-release/sdk-release-prerequisites to prepare for SDK release. * Configurations: 'specification/managedoperations/ManagedOps.Management/tspconfig.yaml', API Version: 2025-07-28-preview, SDK Release Type: beta, and CommitSHA: '96e40a96feba1bb2bcabd7d1612feb71f94c49a8' in SpecRepo: 'https://github.com/Azure/azure-rest-api-specs' Pipeline run: https://dev.azure.com/azure-sdk/internal/_build/results?buildId=5877195 Refer to https://eng.ms/docs/products/azure-developer-experience/develop/sdk-release/sdk-release-prerequisites to prepare for SDK release. * Configurations: 'specification/managedoperations/ManagedOps.Management/tspconfig.yaml', API Version: 2025-07-28-preview, SDK Release Type: beta, and CommitSHA: '816b3edf3aeceb7929a5af05aea47e4a00bf6884' in SpecRepo: 'https://github.com/Azure/azure-rest-api-specs' Pipeline run: https://dev.azure.com/azure-sdk/internal/_build/results?buildId=5877312 Refer to https://eng.ms/docs/products/azure-developer-experience/develop/sdk-release/sdk-release-prerequisites to prepare for SDK release. * Update version_client.txt --------- Co-authored-by: wcas-ms <wcashman@microsoft.com> Co-authored-by: Weidong Xu <weidxu@microsoft.com> [Kafka connector]AddSupportForThroughputBucket (Azure#48009) * add support for throughput bucket in Kafka connector --------- Co-authored-by: Annie Liang <anniemac@Annies-MacBook-Pro.local> mgmt, trustedsigning, update to next preview (Azure#48016) - Adding nregion feature to changelog (Azure#47987) - Always parsing nregion header in storeresult Increment package versions for resources releases (Azure#48020) Remove extra blank line in BulkWriter.scala Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Apply suggestion from @xinlian12 Add fault injection tests for BulkOperationStatusTracker Add integration tests verifying status code tracking through retry paths: - TransactionalBulkExecutorTest: inject TOO_MANY_REQUEST with hitLimit(2), verify statusTracker records 429 with count=2 - CosmosBulkAsyncTest: inject TOO_MANY_REQUEST for non-transactional bulk, verify ItemBulkOperation statusTracker records 429 status codes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Add CHANGELOG entries for status code history in stale progress logs Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> add more tests and update changelog add attempt number in BulkOperationFailedException
* add support for throughput bucket in Kafka connector --------- Co-authored-by: Annie Liang <anniemac@Annies-MacBook-Pro.local>
Changes
add throughput bucket support in Kafka. Please reference here for throughput bucket reference(https://learn.microsoft.com/en-us/azure/cosmos-db/throughput-buckets?tabs=dotnet)
Samples
In the connector configs, please add the following configs:
Test
Manual e2e tests with throughput bucket enabled: