Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;

Expand All @@ -43,14 +45,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");
Expand All @@ -75,14 +77,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";

public static final ConfigProperty<String> TBL_NAME = ConfigProperty
.key("hoodie.table.name")
.key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class HoodieTableConfig extends HoodieConfig {

public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";
public static final String HOODIE_WRITE_TABLE_NAME_KEY = "hoodie.datasource.write.table.name";
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";

public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
.key("hoodie.database.name")
Expand All @@ -90,7 +92,7 @@ public class HoodieTableConfig extends HoodieConfig {
+ "we can set it to limit the table name under a specific database");

public static final ConfigProperty<String> NAME = ConfigProperty
.key("hoodie.table.name")
.key(HOODIE_TABLE_NAME_KEY)
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with Hive. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ public static Object loadClass(String clazz, Class<?>[] constructorArgTypes, Obj
}
}

/**
* Check if the clazz has the target constructor or not.
*
* When catch {@link HoodieException} from {@link #loadClass}, it's inconvenient to say if the exception was thrown
* due to the instantiation's own logic or missing constructor.
*
* TODO: ReflectionUtils should throw a specific exception to indicate Reflection problem.
*/
public static boolean hasConstructor(String clazz, Class<?>[] constructorArgTypes) {
try {
getClass(clazz).getConstructor(constructorArgTypes);
return true;
} catch (NoSuchMethodException e) {
LOG.warn("Unable to instantiate class " + clazz, e);
return false;
}
}

/**
* Creates an instance of the given class. Constructor arg types are inferred.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.sync.common.HoodieSyncConfig;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
* A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config.
Expand All @@ -46,13 +48,14 @@ public HiveQueryNode(DeltaConfig.Config config) {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive query node {}", this.getName());
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
HiveSyncConfig hiveSyncConfig = DataSourceUtils
.buildHiveSyncConfig(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps(),
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath,
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
TypedProperties properties = new TypedProperties();
properties.putAll(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps());
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
Connection con = DriverManager.getConnection(hiveSyncConfig.jdbcUrl, hiveSyncConfig.hiveUser,
hiveSyncConfig.hivePass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,30 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;

import org.apache.hadoop.fs.Path;

/**
* Represents a hive sync node in the DAG of operations for a workflow. Helps to sync hoodie data to hive table.
*/
public class HiveSyncNode extends DagNode<Boolean> {

private HiveServiceProvider hiveServiceProvider;

public HiveSyncNode(Config config) {
this.config = config;
this.hiveServiceProvider = new HiveServiceProvider(config);
}

@Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive sync node");
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
}

public HiveServiceProvider getHiveServiceProvider() {
return hiveServiceProvider;
SyncUtilHelpers.runHoodieMetaSync(HiveSyncTool.class.getName(), new TypedProperties(executionContext.getHoodieTestSuiteWriter().getProps()),
executionContext.getHoodieTestSuiteWriter().getConfiguration(),
new Path(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath).getFileSystem(executionContext.getHoodieTestSuiteWriter().getConfiguration()),
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath, executionContext.getHoodieTestSuiteWriter().getCfg().baseFileFormat);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.service.server.HiveServer2;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
Expand All @@ -46,12 +49,17 @@ public void startLocalHiveServiceIfNeeded(Configuration configuration) throws IO
}

public void syncToLocalHiveIfNeeded(HoodieTestSuiteWriter writer) {
HiveSyncTool hiveSyncTool;
if (this.config.isHiveLocal()) {
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync()
.syncHive(getLocalHiveServer().getHiveConf());
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we removed or deprecated DeltaSync.syncHive(...) in this patch? if not, would prefer to go via deltaSync syncHive api.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a helper function right? The flow seems a bit weird to call the method from DeltaSync. I had removed the method in DeltaSync as well since this was the only place it was used. Can discuss more 1:1 if required.

getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), getLocalHiveServer().getHiveConf()));
} else {
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync().syncHive();
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), writer.getConfiguration()));
}
hiveSyncTool.syncHoodieTable();
}

public void stopLocalHiveServiceIfNeeded() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
Expand Down Expand Up @@ -173,10 +174,10 @@ private static TypedProperties getProperties() {
// Make path selection test suite specific
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "table1");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr");
props.setProperty(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
props.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), "table1");
props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
props.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), TimestampBasedKeyGenerator.class.getName());

props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
Expand All @@ -59,7 +57,6 @@
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -269,32 +266,4 @@ public static List<WriteStatus> getWriteStatuses(ControlMessage.ParticipantInfo
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
}

/**
* Build Hive Sync Config
* Note: This method is a temporary solution.
* Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199
*/
public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = tableBasePath;
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false);
hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default");
hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, "");
hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, "");
hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, "");
hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, "");
hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList());
hiveSyncConfig.partitionValueExtractorClass =
props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName());
hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true);
if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) {
hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE);
}
hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true);
hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false);
hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false);
hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false);
return hiveSyncConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,17 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -54,7 +49,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
Expand Down Expand Up @@ -167,43 +161,10 @@ private void syncMeta() {
if (connectConfigs.isMetaSyncEnabled()) {
Set<String> syncClientToolClasses = new HashSet<>(
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
for (String impl : syncClientToolClasses) {
impl = impl.trim();
switch (impl) {
case "org.apache.hudi.hive.HiveSyncTool":
syncHive();
break;
default:
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
Properties properties = new Properties();
properties.putAll(connectConfigs.getProps());
properties.put("basePath", tableBasePath);
AbstractSyncTool syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[] {Properties.class, FileSystem.class}, properties, fs);
syncTool.syncHoodieTable();
}
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue());
}
}
}

private void syncHive() {
HiveSyncConfig hiveSyncConfig = KafkaConnectUtils.buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath);
String url;
if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) {
url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS);
} else {
url = hiveSyncConfig.jdbcUrl;
}

LOG.info("Syncing target hoodie table with hive table("
+ hiveSyncConfig.tableName
+ "). Hive URL :"
+ url
+ ", basePath :" + tableBasePath);
LOG.info("Hive Sync Conf => " + hiveSyncConfig);
FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
}
Loading