Skip to content

Commit 6c983ab

Browse files
xinlian12annie-mac
andauthored
KafkaV2SinkConnector (Azure#38973)
* add sink connector v2 implementation --------- Co-authored-by: annie-mac <[email protected]>
1 parent 3eb3ce8 commit 6c983ab

File tree

59 files changed

+3435
-204
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+3435
-204
lines changed

eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ the main ServiceBusClientBuilder. -->
319319
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
320320
files="[/\\]azure-cosmos-kafka-connect[/\\]"/>
321321
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSourceConnector"/>
322+
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSinkConnector"/>
322323

323324
<!-- Checkstyle suppressions for resource manager package -->
324325
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.resourcemanager.*"/>

eng/versioning/external_dependencies.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
393393
## Cosmos Kafka connector under sdk\cosmos\azure-cosmos-kafka-connect\pom.xml
394394
# Cosmos Kafka connector runtime dependencies
395395
cosmos_org.apache.kafka:connect-api;3.6.0
396+
cosmos_com.jayway.jsonpath:json-path;2.9.0
396397
# Cosmos Kafka connector tests only
397398
cosmos_org.apache.kafka:connect-runtime;3.6.0
398399
cosmos_org.testcontainers:testcontainers;1.19.5

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#### Features Added
66
* Added Source connector. See [PR 38748](https://github.com/Azure/azure-sdk-for-java/pull/38748)
7+
* Added Sink connector. See [PR 38973](https://github.com/Azure/azure-sdk-for-java/pull/38973)
78

89
#### Breaking Changes
910

sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,16 @@
2323
| `kafka.connect.cosmos.source.metadata.storage.topic` | `_cosmos.metadata.topic` | The name of the topic where the metadata are stored. The metadata topic will be created if it does not already exist, else it will use the pre-created topic. |
2424
| `kafka.connect.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. |
2525
| `kafka.connect.cosmos.source.messageKey.field` | `id` | The field to use as the message key. |
26+
27+
## Sink Connector Configuration
28+
| Config Property Name | Default | Description |
29+
|:---------------------------------------------------------------|:--------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
30+
| `kafka.connect.cosmos.sink.database.name` | None | Cosmos DB database name. |
31+
| `kafka.connect.cosmos.sink.containers.topicMap` | None | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. |
32+
| `kafka.connect.cosmos.sink.errors.tolerance` | `None` | Error tolerance level after exhausting all retries. `None` for fail on error. `All` for log and continue |
33+
| `kafka.connect.cosmos.sink.bulk.enabled` | `true` | Flag to indicate whether Cosmos DB bulk mode is enabled for Sink connector. By default it is true. |
34+
| `kafka.connect.cosmos.sink.bulk.maxConcurrentCosmosPartitions` | `-1` | Cosmos DB Item Write Max Concurrent Cosmos Partitions. If not specified it will be determined based on the number of the container's physical partitions which would indicate every batch is expected to have data from all Cosmos physical partitions. If specified it indicates from at most how many Cosmos Physical Partitions each batch contains data. So this config can be used to make bulk processing more efficient when input data in each batch has been repartitioned to balance to how many Cosmos partitions each batch needs to write. This is mainly useful for very large containers (with hundreds of physical partitions. |
35+
| `kafka.connect.cosmos.sink.bulk.initialBatchSize` | `1` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 1. Reduce this when you want to avoid that the first few requests consume too many RUs. |
36+
| `kafka.connect.cosmos.sink.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts), `ItemDelete` (deletes based on id/pk of data frame), `ItemDeleteIfNotModified` (deletes based on id/pk of data frame if etag hasn't changed since collecting id/pk), `ItemOverwriteIfNotModified` (using create if etag is empty, update/replace with etag pre-condition otherwise, if document was updated the pre-condition failure is ignored) |
37+
| `kafka.connect.cosmos.sink.maxRetryCount` | `10` | Cosmos DB max retry attempts on write failures for Sink connector. By default, the connector will retry on transient write errors for up to 10 times. |
38+
| `kafka.connect.cosmos.sink.id.strategy` | `ProvidedInValueStrategy` | A strategy used to populate the document with an ``id``. Valid strategies are: ``TemplateStrategy``, ``FullKeyStrategy``, ``KafkaMetadataStrategy``, ``ProvidedInKeyStrategy``, ``ProvidedInValueStrategy``. Configuration properties prefixed with``id.strategy`` are passed through to the strategy. For example, when using ``id.strategy=TemplateStrategy`` , the property ``id.strategy.template`` is passed through to the template strategy and used to specify the template string to be used in constructing the ``id``. |

sdk/cosmos/azure-cosmos-kafka-connect/pom.xml

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,14 @@ Licensed under the MIT License.
4747
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
4848
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
4949
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED
50+
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink=ALL-UNNAMED
51+
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink.idStrategy=ALL-UNNAMED
52+
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
5053
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
54+
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
55+
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
5156
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
57+
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED
5258
</javaModulesSurefireArgLine>
5359
</properties>
5460

@@ -80,6 +86,13 @@ Licensed under the MIT License.
8086
<scope>provided</scope>
8187
</dependency>
8288

89+
<dependency>
90+
<groupId>com.azure</groupId>
91+
<artifactId>azure-cosmos-test</artifactId>
92+
<version>1.0.0-beta.7</version> <!-- {x-version-update;com.azure:azure-cosmos-test;current} -->
93+
<scope>test</scope>
94+
</dependency>
95+
8396
<dependency>
8497
<groupId>org.apache.commons</groupId>
8598
<artifactId>commons-collections4</artifactId>
@@ -93,6 +106,11 @@ Licensed under the MIT License.
93106
<scope>test</scope>
94107
<version>1.10.0</version> <!-- {x-version-update;org.apache.commons:commons-text;external_dependency} -->
95108
</dependency>
109+
<dependency>
110+
<groupId>com.jayway.jsonpath</groupId>
111+
<artifactId>json-path</artifactId>
112+
<version>2.9.0</version> <!-- {x-version-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
113+
</dependency>
96114

97115
<dependency>
98116
<groupId>org.apache.kafka</groupId>
@@ -235,6 +253,7 @@ Licensed under the MIT License.
235253
<include>com.azure:*</include>
236254
<include>org.apache.kafka:connect-api:[3.6.0]</include> <!-- {x-include-update;cosmos_org.apache.kafka:connect-api;external_dependency} -->
237255
<include>io.confluent:kafka-connect-maven-plugin:[0.12.0]</include> <!-- {x-include-update;cosmos_io.confluent:kafka-connect-maven-plugin;external_dependency} -->
256+
<include>com.jayway.jsonpath:json-path:[2.9.0]</include> <!-- {x-include-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
238257
<include>org.sourcelab:kafka-connect-client:[4.0.4]</include> <!-- {x-include-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
239258
</includes>
240259
</bannedDependencies>
@@ -319,6 +338,10 @@ Licensed under the MIT License.
319338
<pattern>reactor</pattern>
320339
<shadedPattern>${shadingPrefix}.reactor</shadedPattern>
321340
</relocation>
341+
<relocation>
342+
<pattern>com.jayway.jsonpath</pattern>
343+
<shadedPattern>${shadingPrefix}.com.jayway.jsonpath</shadedPattern>
344+
</relocation>
322345
</relocations>
323346
<artifactSet>
324347
<excludes>
@@ -456,7 +479,7 @@ Licensed under the MIT License.
456479
</profile>
457480
<profile>
458481
<!-- integration tests, requires Cosmos DB Emulator Endpoint -->
459-
<id>kafka-integration</id>
482+
<id>kafka</id>
460483
<properties>
461484
<test.groups>kafka</test.groups>
462485
</properties>
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.kafka.connect;
5+
6+
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
7+
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
8+
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
9+
import org.apache.kafka.common.config.ConfigDef;
10+
import org.apache.kafka.connect.connector.Task;
11+
import org.apache.kafka.connect.sink.SinkConnector;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
/**
20+
* A Sink connector that publishes topic messages to CosmosDB.
21+
*/
22+
public class CosmosDBSinkConnector extends SinkConnector {
23+
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosDBSinkConnector.class);
24+
25+
private CosmosSinkConfig sinkConfig;
26+
27+
@Override
28+
public void start(Map<String, String> props) {
29+
LOGGER.info("Starting the kafka cosmos sink connector");
30+
this.sinkConfig = new CosmosSinkConfig(props);
31+
}
32+
33+
@Override
34+
public Class<? extends Task> taskClass() {
35+
return CosmosSinkTask.class;
36+
}
37+
38+
@Override
39+
public List<Map<String, String>> taskConfigs(int maxTasks) {
40+
LOGGER.info("Setting task configurations with maxTasks {}", maxTasks);
41+
List<Map<String, String>> configs = new ArrayList<>();
42+
for (int i = 0; i < maxTasks; i++) {
43+
configs.add(this.sinkConfig.originalsStrings());
44+
}
45+
46+
return configs;
47+
}
48+
49+
@Override
50+
public void stop() {
51+
LOGGER.debug("Kafka Cosmos sink connector {} is stopped.");
52+
}
53+
54+
@Override
55+
public ConfigDef config() {
56+
return CosmosSinkConfig.getConfigDef();
57+
}
58+
59+
@Override
60+
public String version() {
61+
return KafkaCosmosConstants.CURRENT_VERSION;
62+
}
63+
}

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosDBSourceConnector.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
1515
import com.azure.cosmos.implementation.routing.Range;
1616
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
17-
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
18-
import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
17+
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
18+
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
1919
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
2020
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader;
2121
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
@@ -102,8 +102,8 @@ public ConfigDef config() {
102102

103103
@Override
104104
public String version() {
105-
return CosmosConstants.CURRENT_VERSION;
106-
} // TODO[public preview]: how this is being used
105+
return KafkaCosmosConstants.CURRENT_VERSION;
106+
}
107107

108108
private List<Map<String, String>> getTaskConfigs(int maxTasks) {
109109
Pair<MetadataTaskUnit, List<FeedRangeTaskUnit>> taskUnits = this.getAllTaskUnits();
@@ -314,7 +314,7 @@ private List<Range<String>> getFeedRanges(CosmosContainerProperties containerPro
314314
.getContainer(containerProperties.getId())
315315
.getFeedRanges()
316316
.onErrorMap(throwable ->
317-
CosmosExceptionsHelper.convertToConnectException(
317+
KafkaCosmosExceptionsHelper.convertToConnectException(
318318
throwable,
319319
"GetFeedRanges failed for container " + containerProperties.getId()))
320320
.block()
@@ -324,15 +324,7 @@ private List<Range<String>> getFeedRanges(CosmosContainerProperties containerPro
324324
}
325325

326326
private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties> allContainers) {
327-
Map<String, String> topicMapFromConfig =
328-
this.config.getContainersConfig().getContainersTopicMap()
329-
.stream()
330-
.map(containerTopicMapString -> containerTopicMapString.split("#"))
331-
.collect(
332-
Collectors.toMap(
333-
containerTopicMapArray -> containerTopicMapArray[1],
334-
containerTopicMapArray -> containerTopicMapArray[0]));
335-
327+
Map<String, String> topicMapFromConfig = this.config.getContainersConfig().getContainerToTopicMap();
336328
Map<String, String> effectiveContainersTopicMap = new HashMap<>();
337329
allContainers.forEach(containerProperties -> {
338330
// by default, we are using container id as the topic name as well unless customer override through containers.topicMap

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi
3636

3737
private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) {
3838
if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) {
39-
return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
39+
return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
4040
}
4141

42-
return CosmosConstants.USER_AGENT_SUFFIX;
42+
return KafkaCosmosConstants.USER_AGENT_SUFFIX;
4343
}
4444
}

0 commit comments

Comments
 (0)