diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index bacb4960b7f39..ed039661f61ee 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -130,11 +130,24 @@ protected void setDefaults(String configClassName) { }); } + public String getString(String key) { + return props.getProperty(key); + } + public String getString(ConfigProperty configProperty) { Option rawValue = getRawValue(configProperty); return rawValue.map(Object::toString).orElse(null); } + public String getStringOrDefault(ConfigProperty configProperty) { + return getStringOrDefault(configProperty, configProperty.defaultValue().toString()); + } + + public String getStringOrDefault(ConfigProperty configProperty, String defaultVal) { + Option rawValue = getRawValue(configProperty); + return rawValue.map(Object::toString).orElse(defaultVal); + } + public List getSplitStrings(ConfigProperty configProperty) { return getSplitStrings(configProperty, ","); } @@ -143,8 +156,12 @@ public List getSplitStrings(ConfigProperty configProperty, String return StringUtils.split(getString(configProperty), delimiter); } - public String getString(String key) { - return props.getProperty(key); + public String[] getSplitStringArray(ConfigProperty configProperty) { + return getSplitStrings(configProperty).toArray(new String[0]); + } + + public String[] getSplitStringArray(ConfigProperty configProperty, String delimiter) { + return getSplitStrings(configProperty, delimiter).toArray(new String[0]); } public Integer getInt(ConfigProperty configProperty) { @@ -187,15 +204,6 @@ public Double getDouble(ConfigProperty configProperty) { return rawValue.map(v -> Double.parseDouble(v.toString())).orElse(null); } - public String getStringOrDefault(ConfigProperty configProperty) { - return getStringOrDefault(configProperty, configProperty.defaultValue().toString()); - } - - public String getStringOrDefault(ConfigProperty configProperty, String defaultVal) { - Option rawValue = getRawValue(configProperty); - return rawValue.map(Object::toString).orElse(defaultVal); - } - public TypedProperties getProps() { return getProps(false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 6ee7928c759da..7f615aebc475b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -118,6 +118,22 @@ public static Object loadClass(String clazz, Object... constructorArgs) { return loadClass(clazz, constructorArgTypes, constructorArgs); } + /** + * Creates an instance of the given class. Constructor arg types are inferred. + * Constructors are used in the given order; first successful instantiation will be used as the return result. + */ + public static Object loadClassWithFallbacks(String clazz, Object[][] constructorArgsMatrix) { + for (Object[] args : constructorArgsMatrix) { + try { + return loadClass(clazz, args); + } catch (HoodieException e) { + LOG.warn(String.format("Unable to instantiate class %s with args %s. Trying next...", clazz, Arrays.toString(args))); + } + } + throw new HoodieException(String.format( + "Unable to instantiate class %s with args %s. Trying next...", clazz, Arrays.deepToString(constructorArgsMatrix))); + } + /** * Scans all classes accessible from the context class loader * which belong to the given package and subpackages. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 654d1aeada2e2..7f8612f1260c6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -414,8 +414,6 @@ object DataSourceWriteOptions { @Deprecated val HIVE_DATABASE: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_DATABASE_NAME @Deprecated - val hiveTableOptKeyInferFunc: JavaFunction[HoodieConfig, Option[String]] = HoodieSyncConfig.TABLE_NAME_INFERENCE_FUNCTION - @Deprecated val HIVE_TABLE: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_TABLE_NAME @Deprecated val HIVE_BASE_FILE_FORMAT: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT @@ -432,8 +430,6 @@ object DataSourceWriteOptions { @Deprecated val HIVE_PARTITION_FIELDS: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_PARTITION_FIELDS @Deprecated - val hivePartitionExtractorInferFunc: JavaFunction[HoodieConfig, Option[String]] = HoodieSyncConfig.PARTITION_EXTRACTOR_CLASS_FUNCTION - @Deprecated val HIVE_PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS @Deprecated val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala index 9920aa80baf09..289ecb380b9f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala @@ -18,9 +18,9 @@ package org.apache.hudi import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.hive.{HiveStylePartitionValueExtractor, MultiPartKeysValueExtractor} import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig +import org.apache.hudi.sync.common.model.DefaultPartitionValueExtractor import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -34,7 +34,7 @@ class TestDataSourceOptions { assertEquals(classOf[ComplexKeyGenerator].getName, modifiedOptions1(KEYGENERATOR_CLASS_NAME.key)) assertEquals("hudi_table", modifiedOptions1(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) assertEquals("year,month", modifiedOptions1(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key)) - assertEquals(classOf[MultiPartKeysValueExtractor].getName, + assertEquals(classOf[DefaultPartitionValueExtractor].getName, modifiedOptions1(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key)) val inputOptions2 = Map( @@ -46,7 +46,7 @@ class TestDataSourceOptions { assertEquals(classOf[SimpleKeyGenerator].getName, modifiedOptions2(KEYGENERATOR_CLASS_NAME.key)) assertEquals("hudi_table", modifiedOptions2(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) assertEquals("year", modifiedOptions2(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key)) - assertEquals(classOf[HiveStylePartitionValueExtractor].getName, + assertEquals(classOf[DefaultPartitionValueExtractor].getName, modifiedOptions2(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key)) } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index ea01ffe15f01a..f24a1d129f8d1 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; @@ -76,13 +77,9 @@ public HMSDDLExecutor(HiveSyncConfig syncConfig) throws HiveException, MetaExcep this.syncConfig = syncConfig; this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME); this.client = Hive.get(syncConfig.getHiveConf()).getMSC(); - try { - this.partitionValueExtractor = - (PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); - } catch (Exception e) { - throw new HoodieHiveSyncException( - "Failed to initialize PartitionValueExtractor class " + syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), e); - } + this.partitionValueExtractor = (PartitionValueExtractor) ReflectionUtils.loadClassWithFallbacks( + syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), + new Object[][] {new Object[] {syncConfig.getSplitStringArray(META_SYNC_PARTITION_FIELDS)}, new Object[0]}); } @Override diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 43dcc0d217c94..ad2ebd0848126 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hive.HiveSyncConfig; @@ -61,13 +62,9 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor { public QueryBasedDDLExecutor(HiveSyncConfig config) { this.config = config; this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); - try { - this.partitionValueExtractor = - (PartitionValueExtractor) Class.forName(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); - } catch (Exception e) { - throw new HoodieHiveSyncException( - "Failed to initialize PartitionValueExtractor class " + config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), e); - } + this.partitionValueExtractor = (PartitionValueExtractor) ReflectionUtils.loadClassWithFallbacks( + config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), + new Object[][] {new Object[] {config.getSplitStringArray(META_SYNC_PARTITION_FIELDS)}, new Object[0]}); } /** diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java index a11b23762e73a..7c0899152726f 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java @@ -74,7 +74,7 @@ private HiveSyncGlobalCommitParams getGlobalCommitConfig(String commitTime) thro params.loadedProps.setProperty(META_SYNC_BASE_PATH.key(), localCluster.tablePath(DB_NAME, TBL_NAME)); params.loadedProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true"); params.loadedProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false"); - params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); + params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "year,month,day"); return params; } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 9687e557928bd..d517c9d3b0ea7 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -47,6 +47,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor; @@ -92,6 +93,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; import static org.junit.jupiter.api.Assertions.fail; @@ -139,6 +141,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti hiveSyncProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true"); hiveSyncProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false"); hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); + hiveSyncProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getName()); hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, configuration); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 32ade18d08117..a62e5029cd07b 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -47,6 +47,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA; public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, AutoCloseable { @@ -59,7 +60,9 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto public HoodieSyncClient(HoodieSyncConfig config) { this.config = config; - this.partitionValueExtractor = ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)); + this.partitionValueExtractor = (PartitionValueExtractor) ReflectionUtils.loadClassWithFallbacks( + config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), + new Object[][] {new Object[] {config.getSplitStringArray(META_SYNC_PARTITION_FIELDS)}, new Object[0]}); this.metaClient = HoodieTableMetaClient.builder() .setConf(config.getHadoopConf()) .setBasePath(config.getString(META_SYNC_BASE_PATH)) diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java index ba763ddc14bb7..c8446a0a6b94a 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -37,6 +36,10 @@ import java.util.Properties; import java.util.function.Function; +import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME; +import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY; +import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY; + /** * Configs needed to sync data into external meta stores, catalogs, etc. */ @@ -56,22 +59,14 @@ public class HoodieSyncConfig extends HoodieConfig { public static final ConfigProperty META_SYNC_DATABASE_NAME = ConfigProperty .key("hoodie.datasource.hive_sync.database") .defaultValue("default") + .withInferFunction(cfg -> Option.ofNullable(cfg.getString(DATABASE_NAME))) .withDocumentation("The name of the destination database that we should sync the hudi table to."); - // If the table name for the metastore destination is not provided, pick it up from write or table configs. - public static final Function> TABLE_NAME_INFERENCE_FUNCTION = cfg -> { - if (cfg.contains(HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY)) { - return Option.of(cfg.getString(HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY)); - } else if (cfg.contains(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)) { - return Option.of(cfg.getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)); - } else { - return Option.empty(); - } - }; public static final ConfigProperty META_SYNC_TABLE_NAME = ConfigProperty .key("hoodie.datasource.hive_sync.table") .defaultValue("unknown") - .withInferFunction(TABLE_NAME_INFERENCE_FUNCTION) + .withInferFunction(cfg -> Option.ofNullable(cfg.getString(HOODIE_WRITE_TABLE_NAME_KEY)) + .or(() -> Option.ofNullable(cfg.getString(HOODIE_TABLE_NAME_KEY)))) .withDocumentation("The name of the destination table that we should sync the hudi table to."); public static final ConfigProperty META_SYNC_BASE_FILE_FORMAT = ConfigProperty @@ -93,27 +88,10 @@ public class HoodieSyncConfig extends HoodieConfig { .withInferFunction(PARTITION_FIELDS_INFERENCE_FUNCTION) .withDocumentation("Field in the table to use for determining hive partition columns."); - // If partition value extraction class is not explicitly provided, configure based on the partition fields. - public static final Function> PARTITION_EXTRACTOR_CLASS_FUNCTION = cfg -> { - if (!cfg.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)) { - return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); - } else { - int numOfPartFields = cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME).split(",").length; - if (numOfPartFields == 1 - && cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE) - && cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) { - return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor"); - } else { - return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor"); - } - } - }; public static final ConfigProperty META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty .key("hoodie.datasource.hive_sync.partition_extractor_class") - .defaultValue("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor") - .withInferFunction(PARTITION_EXTRACTOR_CLASS_FUNCTION) - .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " - + "default 'SlashEncodedDayPartitionValueExtractor'."); + .defaultValue("org.apache.hudi.sync.common.model.DefaultPartitionValueExtractor") + .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values."); public static final ConfigProperty META_SYNC_ASSUME_DATE_PARTITION = ConfigProperty .key("hoodie.datasource.hive_sync.assume_date_partitioning") @@ -148,6 +126,7 @@ public HoodieSyncConfig(Properties props) { public HoodieSyncConfig(Properties props, Configuration hadoopConf) { super(props); + setDefaults(getClass().getName()); this.hadoopConf = hadoopConf; } @@ -173,9 +152,9 @@ public String toString() { } public static class HoodieSyncConfigParams { - @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true) + @Parameter(names = {"--database"}, description = "name of the target database in meta store") public String databaseName; - @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true) + @Parameter(names = {"--table"}, description = "name of the target table in meta store") public String tableName; @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true) public String basePath; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/DefaultPartitionValueExtractor.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/DefaultPartitionValueExtractor.java new file mode 100644 index 0000000000000..6edec3dbe6eb9 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/DefaultPartitionValueExtractor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.common.model; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class DefaultPartitionValueExtractor implements PartitionValueExtractor { + + private final String[] partitionFields; + + public DefaultPartitionValueExtractor(String[] partitionFields) { + this.partitionFields = partitionFields; + } + + @Override + public List extractPartitionValuesInPath(String partitionPath) { + if (partitionFields == null || partitionFields.length == 0) { + return Collections.emptyList(); + } + + ValidationUtils.checkArgument(StringUtils.nonEmpty(partitionPath), + "Expected non-empty partition path but got " + partitionPath); + + String[] parts = partitionPath.split("/"); + int depth = parts.length; + ValidationUtils.checkArgument(depth == partitionFields.length, + "Expected partition depth of " + partitionFields.length + " but got " + depth); + + String[] partitionValues = new String[depth]; + Boolean isHiveStyle = null; + for (int i = 0; i < depth; i++) { + int equalSignIndex = parts[i].indexOf("="); + boolean foundHiveStyle = equalSignIndex != -1; + if (isHiveStyle == null) { + isHiveStyle = foundHiveStyle; + } else { + ValidationUtils.checkArgument(foundHiveStyle == isHiveStyle, + "Expected hiveStyle=" + isHiveStyle + " at depth=" + i + " but got hiveStyle=" + foundHiveStyle); + } + + if (isHiveStyle) { + String foundFieldName = parts[i].substring(0, equalSignIndex); + String fieldName = partitionFields[i]; + ValidationUtils.checkArgument(Objects.equals(fieldName, foundFieldName), + "Expected field `" + fieldName + "` at depth=" + i + " but got `" + foundFieldName + "`"); + partitionValues[i] = parts[i].substring(equalSignIndex + 1); + } else { + partitionValues[i] = parts[i]; + } + } + + return Arrays.asList(partitionValues); + } +} diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java new file mode 100644 index 0000000000000..1f6c05cd12423 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.common; + +import org.apache.hudi.common.table.HoodieTableConfig; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestHoodieSyncConfig { + + @Test + void testInferDatabaseAndTableNames() { + Properties props1 = new Properties(); + props1.setProperty(HoodieTableConfig.DATABASE_NAME.key(), "db1"); + props1.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "tbl1"); + HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration()); + assertEquals("db1", config1.getString(META_SYNC_DATABASE_NAME)); + assertEquals("tbl1", config1.getString(META_SYNC_TABLE_NAME)); + + Properties props2 = new Properties(); + props2.setProperty(HoodieTableConfig.DATABASE_NAME.key(), "db2"); + props2.setProperty(HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY, "tbl2"); + HoodieSyncConfig config2 = new HoodieSyncConfig(props2, new Configuration()); + assertEquals("db2", config2.getString(META_SYNC_DATABASE_NAME)); + assertEquals("tbl2", config2.getString(META_SYNC_TABLE_NAME)); + + HoodieSyncConfig config3 = new HoodieSyncConfig(new Properties(), new Configuration()); + assertEquals("default", config3.getString(META_SYNC_DATABASE_NAME)); + assertEquals("unknown", config3.getString(META_SYNC_TABLE_NAME)); + } +} diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/model/TestDefaultPartitionValueExtractor.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/model/TestDefaultPartitionValueExtractor.java new file mode 100644 index 0000000000000..f161b3d268bd9 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/model/TestDefaultPartitionValueExtractor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.common.model; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TestDefaultPartitionValueExtractor { + + @Test + void testNonPartition() { + PartitionValueExtractor extractor = new DefaultPartitionValueExtractor(new String[0]); + assertEquals(Collections.emptyList(), extractor.extractPartitionValuesInPath(null)); + } + + @Test + void testNonEmptyPartitionsParsingWithIllegalPartitionPath() { + PartitionValueExtractor extractor = new DefaultPartitionValueExtractor(new String[] {"foo", "bar"}); + assertThrows(IllegalArgumentException.class, () -> extractor.extractPartitionValuesInPath("")); + assertThrows(IllegalArgumentException.class, () -> extractor.extractPartitionValuesInPath(null)); + } + + @ParameterizedTest + @CsvSource(value = { + "a:a:1", + "a,b:a/b:2", + "a,b,c:a/b/c:3", + "a:0=a:1", + "a,b:0=a/1=b:2", + "a,b,c:0=a/1=b/2=c:3"}, delimiter = ':') + void testMultiPartPartitions(String expected, String partitionPath, String depthStr) { + int depth = Integer.parseInt(depthStr); + String[] partitionFields = IntStream.range(0, depth).mapToObj(String::valueOf).toArray(String[]::new); + PartitionValueExtractor extractor = new DefaultPartitionValueExtractor(partitionFields); + List partitionValues = extractor.extractPartitionValuesInPath(partitionPath); + List expectedPartitionValues = Arrays.asList(expected.split(",")); + assertEquals(expectedPartitionValues, partitionValues); + } + + @ParameterizedTest + @CsvSource(value = { + "true:0=a/b:2", + "false:a/1=b/2=c:3"}, delimiter = ':') + void testInconsistentHiveStylePartitions(boolean expectedHiveStyle, String partitionPath, String depthStr) { + int depth = Integer.parseInt(depthStr); + String[] partitionFields = IntStream.range(0, depth).mapToObj(String::valueOf).toArray(String[]::new); + PartitionValueExtractor extractor = new DefaultPartitionValueExtractor(partitionFields); + Throwable t = assertThrows(IllegalArgumentException.class, () -> extractor.extractPartitionValuesInPath(partitionPath)); + assertEquals("Expected hiveStyle=" + expectedHiveStyle + " at depth=1 but got hiveStyle=" + !expectedHiveStyle, t.getMessage()); + } + + @ParameterizedTest + @CsvSource(value = { + "0=a/00=b:2:00", + "0=a/P1=b/2=c:3:P1"}, delimiter = ':') + void testInconsistentFieldNameInHiveStylePartitions(String partitionPath, String depthStr, String unexpectedFieldName) { + int depth = Integer.parseInt(depthStr); + String[] partitionFields = IntStream.range(0, depth).mapToObj(String::valueOf).toArray(String[]::new); + PartitionValueExtractor extractor = new DefaultPartitionValueExtractor(partitionFields); + Throwable t = assertThrows(IllegalArgumentException.class, () -> extractor.extractPartitionValuesInPath(partitionPath)); + assertEquals("Expected field `1` at depth=1 but got `" + unexpectedFieldName + "`", t.getMessage()); + } +}