From 5b6839efcd48d8fd8ef17d4d4d5389d023fcd7b2 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 11 Jul 2022 12:24:29 +0530 Subject: [PATCH 1/2] [HUDI-3287] Remove hudi-spark dependencies from hudi-kafka-connect-bundle --- packaging/hudi-kafka-connect-bundle/pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index cb63e7a5f2e25..b5fcaa4eca0f9 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -75,8 +75,6 @@ org.apache.hudi:hudi-common org.apache.hudi:hudi-client-common org.apache.hudi:hudi-java-client - org.apache.hudi:hudi-spark-client - org.apache.hudi:hudi-spark-common_${scala.binary.version} org.apache.hudi:hudi-kafka-connect org.apache.hudi:hudi-utilities_${scala.binary.version} org.apache.hudi:hudi-hive-sync @@ -322,11 +320,6 @@ - - org.apache.hudi - hudi-spark-common_${scala.binary.version} - ${project.version} - From b4db097e1831a79bd0ee2097b98ef3ac855ee31a Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 21 Sep 2022 18:01:44 +0530 Subject: [PATCH 2/2] Remove usage of CustomKeyGenerator --- .../keygen/factory/HoodieAvroKeyGeneratorFactory.java | 2 +- .../apache/hudi/connect/utils/KafkaConnectUtils.java | 10 ++++------ .../writers/KafkaConnectTransactionServices.java | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java index 0e17aff242860..b24b9a8e2d9b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java @@ -54,7 +54,7 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx return Objects.isNull(keyGenerator) ? createAvroKeyGeneratorByType(props) : keyGenerator; } - private static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException { + public static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException { // Use KeyGeneratorType.SIMPLE as default keyGeneratorType String keyGeneratorType = props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java index d62c9a768c4d8..6b08bae2af94b 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java @@ -18,8 +18,6 @@ package org.apache.hudi.connect.utils; -import com.google.protobuf.ByteString; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -36,9 +34,11 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.CustomAvroKeyGenerator; -import org.apache.hudi.keygen.CustomKeyGenerator; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.TopicDescription; @@ -185,8 +185,7 @@ public static String getRecordKeyColumns(KeyGenerator keyGenerator) { * @return partition columns Returns the partition columns separated by comma. */ public static String getPartitionColumns(KeyGenerator keyGenerator, TypedProperties typedProperties) { - - if (keyGenerator instanceof CustomKeyGenerator || keyGenerator instanceof CustomAvroKeyGenerator) { + if (keyGenerator instanceof CustomAvroKeyGenerator) { return ((BaseKeyGenerator) keyGenerator).getPartitionPathFields().stream().map( pathField -> Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)) .findFirst().orElse("Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) @@ -200,7 +199,6 @@ public static String getPartitionColumns(KeyGenerator keyGenerator, TypedPropert return typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); } - /** * Get the Metadata from the latest commit file. * diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index 934dbadf1c750..f71d8480c3ef6 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -84,7 +84,7 @@ public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throw context = new HoodieJavaEngineContext(hadoopConf); try { - KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator( + KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createAvroKeyGeneratorByType( new TypedProperties(connectConfigs.getProps())); String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator); String partitionColumns = KafkaConnectUtils.getPartitionColumns(keyGenerator,