transformerClassNames) {
if (provider == null) {
return null;
}
- if (provider instanceof SchemaProviderWithPostProcessor) {
- return (SchemaProviderWithPostProcessor)provider;
+ if (provider instanceof SchemaProviderWithPostProcessor) {
+ return (SchemaProviderWithPostProcessor) provider;
}
String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null);
boolean enableSparkAvroPostProcessor = Boolean.parseBoolean(cfg.getString(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE, "true"));
if (transformerClassNames != null && !transformerClassNames.isEmpty()
- && enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) {
+ && enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) {
schemaPostProcessorClass = SparkAvroPostProcessor.class.getName();
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
index 17fecdeccf0fe..78bb30302b287 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
@@ -18,7 +18,6 @@
package org.apache.hudi.utilities.deltastreamer;
-import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -31,16 +30,15 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -160,10 +158,9 @@ public void execute() throws IOException {
*/
private void syncHive() {
if (cfg.enableHiveSync) {
- HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
- LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
- + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
- new HiveSyncTool(hiveSyncConfig, new HiveConf(configuration, HiveConf.class), fs).syncHoodieTable();
+ props.put(HoodieSyncConfig.META_SYNC_BASE_PATH, cfg.targetBasePath);
+ props.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, cfg.baseFileFormat);
+ new HiveSyncTool(props, configuration, fs).syncHoodieTable();
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index eb553c94e43ea..cf8a4caae628a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -41,7 +41,6 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -51,12 +50,11 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
@@ -77,7 +75,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -95,7 +92,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -174,7 +170,7 @@ public class DeltaSync implements Serializable {
/**
* Bag of properties with source, hoodie client, key generator etc.
- *
+ *
* NOTE: These properties are already consolidated w/ CLI provided config-overrides
*/
private final TypedProperties props;
@@ -268,7 +264,7 @@ public void refreshTimeline() throws IOException {
SimpleKeyGenerator.class.getName()))
.setPreCombineField(cfg.sourceOrderingField)
.initTable(new Configuration(jssc.hadoopConfiguration()),
- cfg.targetBasePath);
+ cfg.targetBasePath);
}
}
@@ -320,7 +316,7 @@ public Pair