Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
Expand All @@ -57,6 +59,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -266,4 +269,32 @@ public static List<WriteStatus> getWriteStatuses(ControlMessage.ParticipantInfo
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
}

/**
* Build Hive Sync Config
* Note: This method is a temporary solution.
* Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199
*/
public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = tableBasePath;
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false);
hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default");
hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, "");
hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, "");
hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, "");
hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, "");
hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList());
hiveSyncConfig.partitionValueExtractorClass =
props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName());
hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true);
if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) {
hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE);
}
hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true);
hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false);
hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false);
hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false);
return hiveSyncConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ public String getHadoopConfHome() {
return getString(HADOOP_HOME);
}

public static final String HIVE_USE_PRE_APACHE_INPUT_FORMAT = "hoodie.datasource.hive_sync.use_pre_apache_input_format";
public static final String HIVE_DATABASE = "hoodie.datasource.hive_sync.database";
public static final String HIVE_TABLE = "hoodie.datasource.hive_sync.table";
public static final String HIVE_USER = "hoodie.datasource.hive_sync.username";
public static final String HIVE_PASS = "hoodie.datasource.hive_sync.password";
public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl";
public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class";
public static final String HIVE_USE_JDBC = "hoodie.datasource.hive_sync.use_jdbc";
public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database";
public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions";
public static final String HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE = "hoodie.datasource.hive_sync.skip_ro_suffix";
public static final String HIVE_SUPPORT_TIMESTAMP_TYPE = "hoodie.datasource.hive_sync.support_timestamp";
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";

public static class Builder {

protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.connect.writers;

import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
Expand All @@ -32,12 +31,14 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.sync.common.AbstractSyncTool;
Expand Down Expand Up @@ -163,9 +164,9 @@ private boolean isAsyncCompactionEnabled() {
}

private void syncMeta() {
Set<String> syncClientToolClasses = new HashSet<>(
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
if (connectConfigs.isMetaSyncEnabled()) {
Set<String> syncClientToolClasses = new HashSet<>(
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
for (String impl : syncClientToolClasses) {
impl = impl.trim();
switch (impl) {
Expand All @@ -185,16 +186,20 @@ private void syncMeta() {
}

private void syncHive() {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(
new TypedProperties(connectConfigs.getProps()),
tableBasePath,
"PARQUET");
HiveSyncConfig hiveSyncConfig = KafkaConnectUtils.buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath);
String url;
if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) {
url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS);
} else {
url = hiveSyncConfig.jdbcUrl;
}

LOG.info("Syncing target hoodie table with hive table("
+ hiveSyncConfig.tableName
+ "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl
+ "). Hive URL :"
+ url
+ ", basePath :" + tableBasePath);
LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
LOG.info("Hive Sync Conf => " + hiveSyncConfig);
FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
Expand Down