Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,24 @@ protected void setDefaults(String configClassName) {
});
}

public String getString(String key) {
return props.getProperty(key);
}

public <T> String getString(ConfigProperty<T> configProperty) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(Object::toString).orElse(null);
}

public <T> String getStringOrDefault(ConfigProperty<T> configProperty) {
return getStringOrDefault(configProperty, configProperty.defaultValue().toString());
}

public <T> String getStringOrDefault(ConfigProperty<T> configProperty, String defaultVal) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(Object::toString).orElse(defaultVal);
}

public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty) {
return getSplitStrings(configProperty, ",");
}
Expand All @@ -143,8 +156,12 @@ public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty, String
return StringUtils.split(getString(configProperty), delimiter);
}

public String getString(String key) {
return props.getProperty(key);
public <T> String[] getSplitStringArray(ConfigProperty<T> configProperty) {
return getSplitStrings(configProperty).toArray(new String[0]);
}

public <T> String[] getSplitStringArray(ConfigProperty<T> configProperty, String delimiter) {
return getSplitStrings(configProperty, delimiter).toArray(new String[0]);
}

public <T> Integer getInt(ConfigProperty<T> configProperty) {
Expand Down Expand Up @@ -187,15 +204,6 @@ public <T> Double getDouble(ConfigProperty<T> configProperty) {
return rawValue.map(v -> Double.parseDouble(v.toString())).orElse(null);
}

public <T> String getStringOrDefault(ConfigProperty<T> configProperty) {
return getStringOrDefault(configProperty, configProperty.defaultValue().toString());
}

public <T> String getStringOrDefault(ConfigProperty<T> configProperty, String defaultVal) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(Object::toString).orElse(defaultVal);
}
Comment on lines -190 to -197
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-arranged getString*() methods to keep them close


public TypedProperties getProps() {
return getProps(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ public static Object loadClass(String clazz, Object... constructorArgs) {
return loadClass(clazz, constructorArgTypes, constructorArgs);
}

/**
* Creates an instance of the given class. Constructor arg types are inferred.
* Constructors are used in the given order; first successful instantiation will be used as the return result.
*/
public static Object loadClassWithFallbacks(String clazz, Object[][] constructorArgsMatrix) {
for (Object[] args : constructorArgsMatrix) {
try {
return loadClass(clazz, args);
} catch (HoodieException e) {
LOG.warn(String.format("Unable to instantiate class %s with args %s. Trying next...", clazz, Arrays.toString(args)));
}
}
throw new HoodieException(String.format(
"Unable to instantiate class %s with args %s. Trying next...", clazz, Arrays.deepToString(constructorArgsMatrix)));
}

/**
* Scans all classes accessible from the context class loader
* which belong to the given package and subpackages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,6 @@ object DataSourceWriteOptions {
@Deprecated
val HIVE_DATABASE: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_DATABASE_NAME
@Deprecated
val hiveTableOptKeyInferFunc: JavaFunction[HoodieConfig, Option[String]] = HoodieSyncConfig.TABLE_NAME_INFERENCE_FUNCTION
@Deprecated
val HIVE_TABLE: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_TABLE_NAME
@Deprecated
val HIVE_BASE_FILE_FORMAT: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT
Expand All @@ -432,8 +430,6 @@ object DataSourceWriteOptions {
@Deprecated
val HIVE_PARTITION_FIELDS: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_PARTITION_FIELDS
@Deprecated
val hivePartitionExtractorInferFunc: JavaFunction[HoodieConfig, Option[String]] = HoodieSyncConfig.PARTITION_EXTRACTOR_CLASS_FUNCTION
@Deprecated
val HIVE_PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS
@Deprecated
val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.hive.{HiveStylePartitionValueExtractor, MultiPartKeysValueExtractor}
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.model.DefaultPartitionValueExtractor
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

Expand All @@ -34,7 +34,7 @@ class TestDataSourceOptions {
assertEquals(classOf[ComplexKeyGenerator].getName, modifiedOptions1(KEYGENERATOR_CLASS_NAME.key))
assertEquals("hudi_table", modifiedOptions1(HoodieSyncConfig.META_SYNC_TABLE_NAME.key))
assertEquals("year,month", modifiedOptions1(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key))
assertEquals(classOf[MultiPartKeysValueExtractor].getName,
assertEquals(classOf[DefaultPartitionValueExtractor].getName,
modifiedOptions1(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key))

val inputOptions2 = Map(
Expand All @@ -46,7 +46,7 @@ class TestDataSourceOptions {
assertEquals(classOf[SimpleKeyGenerator].getName, modifiedOptions2(KEYGENERATOR_CLASS_NAME.key))
assertEquals("hudi_table", modifiedOptions2(HoodieSyncConfig.META_SYNC_TABLE_NAME.key))
assertEquals("year", modifiedOptions2(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key))
assertEquals(classOf[HiveStylePartitionValueExtractor].getName,
assertEquals(classOf[DefaultPartitionValueExtractor].getName,
modifiedOptions2(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
Expand Down Expand Up @@ -76,13 +77,9 @@ public HMSDDLExecutor(HiveSyncConfig syncConfig) throws HiveException, MetaExcep
this.syncConfig = syncConfig;
this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.client = Hive.get(syncConfig.getHiveConf()).getMSC();
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance();
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to initialize PartitionValueExtractor class " + syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), e);
}
this.partitionValueExtractor = (PartitionValueExtractor) ReflectionUtils.loadClassWithFallbacks(
syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
new Object[][] {new Object[] {syncConfig.getSplitStringArray(META_SYNC_PARTITION_FIELDS)}, new Object[0]});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
Expand Down Expand Up @@ -61,13 +62,9 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
public QueryBasedDDLExecutor(HiveSyncConfig config) {
this.config = config;
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance();
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to initialize PartitionValueExtractor class " + config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), e);
}
this.partitionValueExtractor = (PartitionValueExtractor) ReflectionUtils.loadClassWithFallbacks(
config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
new Object[][] {new Object[] {config.getSplitStringArray(META_SYNC_PARTITION_FIELDS)}, new Object[0]});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private HiveSyncGlobalCommitParams getGlobalCommitConfig(String commitTime) thro
params.loadedProps.setProperty(META_SYNC_BASE_PATH.key(), localCluster.tablePath(DB_NAME, TBL_NAME));
params.loadedProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
params.loadedProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "year,month,day");
return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor;

Expand Down Expand Up @@ -92,6 +93,7 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -139,6 +141,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
hiveSyncProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
hiveSyncProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
hiveSyncProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getName());
hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3");

hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;

public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, AutoCloseable {
Expand All @@ -59,7 +60,9 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto

public HoodieSyncClient(HoodieSyncConfig config) {
this.config = config;
this.partitionValueExtractor = ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
this.partitionValueExtractor = (PartitionValueExtractor) ReflectionUtils.loadClassWithFallbacks(
config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
new Object[][] {new Object[] {config.getSplitStringArray(META_SYNC_PARTITION_FIELDS)}, new Object[0]});
this.metaClient = HoodieTableMetaClient.builder()
.setConf(config.getHadoopConf())
.setBasePath(config.getString(META_SYNC_BASE_PATH))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
Expand All @@ -37,6 +36,10 @@
import java.util.Properties;
import java.util.function.Function;

import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY;

/**
* Configs needed to sync data into external meta stores, catalogs, etc.
*/
Expand All @@ -56,22 +59,14 @@ public class HoodieSyncConfig extends HoodieConfig {
public static final ConfigProperty<String> META_SYNC_DATABASE_NAME = ConfigProperty
.key("hoodie.datasource.hive_sync.database")
.defaultValue("default")
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(DATABASE_NAME)))
.withDocumentation("The name of the destination database that we should sync the hudi table to.");

// If the table name for the metastore destination is not provided, pick it up from write or table configs.
public static final Function<HoodieConfig, Option<String>> TABLE_NAME_INFERENCE_FUNCTION = cfg -> {
if (cfg.contains(HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY)) {
return Option.of(cfg.getString(HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY));
} else if (cfg.contains(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)) {
return Option.of(cfg.getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY));
} else {
return Option.empty();
}
};
public static final ConfigProperty<String> META_SYNC_TABLE_NAME = ConfigProperty
.key("hoodie.datasource.hive_sync.table")
.defaultValue("unknown")
.withInferFunction(TABLE_NAME_INFERENCE_FUNCTION)
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(HOODIE_WRITE_TABLE_NAME_KEY))
.or(() -> Option.ofNullable(cfg.getString(HOODIE_TABLE_NAME_KEY))))
.withDocumentation("The name of the destination table that we should sync the hudi table to.");

public static final ConfigProperty<String> META_SYNC_BASE_FILE_FORMAT = ConfigProperty
Expand All @@ -93,27 +88,10 @@ public class HoodieSyncConfig extends HoodieConfig {
.withInferFunction(PARTITION_FIELDS_INFERENCE_FUNCTION)
.withDocumentation("Field in the table to use for determining hive partition columns.");

// If partition value extraction class is not explicitly provided, configure based on the partition fields.
public static final Function<HoodieConfig, Option<String>> PARTITION_EXTRACTOR_CLASS_FUNCTION = cfg -> {
if (!cfg.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)) {
return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
} else {
int numOfPartFields = cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME).split(",").length;
if (numOfPartFields == 1
&& cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
&& cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) {
return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
} else {
return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
}
}
};
public static final ConfigProperty<String> META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_extractor_class")
.defaultValue("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor")
.withInferFunction(PARTITION_EXTRACTOR_CLASS_FUNCTION)
.withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, "
+ "default 'SlashEncodedDayPartitionValueExtractor'.");
.defaultValue("org.apache.hudi.sync.common.model.DefaultPartitionValueExtractor")
.withDocumentation("Class which implements PartitionValueExtractor to extract the partition values.");

public static final ConfigProperty<String> META_SYNC_ASSUME_DATE_PARTITION = ConfigProperty
.key("hoodie.datasource.hive_sync.assume_date_partitioning")
Expand Down Expand Up @@ -148,6 +126,7 @@ public HoodieSyncConfig(Properties props) {

public HoodieSyncConfig(Properties props, Configuration hadoopConf) {
super(props);
setDefaults(getClass().getName());
this.hadoopConf = hadoopConf;
}

Expand All @@ -173,9 +152,9 @@ public String toString() {
}

public static class HoodieSyncConfigParams {
@Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
@Parameter(names = {"--database"}, description = "name of the target database in meta store")
public String databaseName;
@Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
@Parameter(names = {"--table"}, description = "name of the target table in meta store")
public String tableName;
@Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
public String basePath;
Expand Down
Loading