Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/hudi-defaults.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.mode jdbc
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false
3 changes: 1 addition & 2 deletions docker/demo/config/hoodie-incr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,5 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
# hive sync
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
hoodie.datasource.hive_sync.mode=jdbc
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
hoodie.datasource.hive_sync.partition_fields=partition
hoodie.datasource.hive_sync.partition_fields=partition
2 changes: 0 additions & 2 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down Expand Up @@ -80,7 +79,6 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testLoadGlobalConfFile() {
DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size());
assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl"));
assertEquals("jdbc", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.mode"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc"));
assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp"));
assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Example:
hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
hoodie.datasource.hive_sync.mode jdbc
hoodie.datasource.hive_sync.use_jdbc true
hoodie.datasource.hive_sync.support_timestamp false
hoodie.index.type BLOOM
hoodie.metadata.enable true
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ private FlinkOptions() {
.defaultValue(false)
.withDescription("Assume partitioning is yyyy/mm/dd, default false");

public static final ConfigOption<Boolean> HIVE_SYNC_USE_JDBC = ConfigOptions
.key("hive_sync.use_jdbc")
.booleanType()
.defaultValue(true)
.withDescription("Use JDBC when hive synchronization is enabled, default true");

public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions
.key("hive_sync.auto_create_db")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
Expand Down Expand Up @@ -104,6 +105,7 @@ public static Properties buildSyncConfig(Configuration conf) {
props.setPropertyIfNonNull(HIVE_TABLE_SERDE_PROPERTIES.key(), conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES));
props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), String.join(",", FilePathUtils.extractHivePartitionFields(conf)));
props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME));
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC)));
props.setPropertyIfNonNull(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(conf.getBoolean(FlinkOptions.METADATA_ENABLED)));
props.setPropertyIfNonNull(HIVE_IGNORE_EXCEPTIONS.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS)));
props.setPropertyIfNonNull(HIVE_SUPPORT_TIMESTAMP_TYPE.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
public Boolean hiveSyncAssumeDatePartition = false;

@Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization is enabled, default true")
public Boolean hiveSyncUseJdbc = true;

@Parameter(names = {"--hive-sync-auto-create-db"}, description = "Auto create hive database if it does not exists, default true")
public Boolean hiveSyncAutoCreateDb = true;

Expand Down Expand Up @@ -416,6 +419,7 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, config.hiveSyncPartitionFields);
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, config.hiveSyncPartitionExtractorClass);
conf.setBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION, config.hiveSyncAssumeDatePartition);
conf.setBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC, config.hiveSyncUseJdbc);
conf.setBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB, config.hiveSyncAutoCreateDb);
conf.setBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS, config.hiveSyncIgnoreExceptions);
conf.setBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX, config.hiveSyncSkipRoSuffix);
Expand Down
1 change: 1 addition & 0 deletions hudi-kafka-connect/demo/config-sink-hive.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"hoodie.datasource.hive_sync.table": "huditesttopic",
"hoodie.datasource.hive_sync.partition_fields": "date",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"dfs.client.use.datanode.hostname": "true",
"hive.metastore.uris": "thrift://hivemetastore:9083",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public String getHadoopConfHome() {
public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl";
public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class";
public static final String HIVE_USE_JDBC = "hoodie.datasource.hive_sync.use_jdbc";
public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database";
public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ object DataSourceWriteOptions {
val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION
@Deprecated
val HIVE_USE_PRE_APACHE_INPUT_FORMAT: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT

/** @deprecated Use {@link HIVE_SYNC_MODE} instead of this config from 0.9.0 */
@Deprecated
val HIVE_USE_JDBC: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_USE_JDBC
@Deprecated
val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE
@Deprecated
Expand Down Expand Up @@ -496,6 +500,9 @@ object DataSourceWriteOptions {
/** @deprecated Use {@link HIVE_USE_PRE_APACHE_INPUT_FORMAT} and its methods instead */
@Deprecated
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key()
/** @deprecated Use {@link HIVE_USE_JDBC} and its methods instead */
@Deprecated
val HIVE_USE_JDBC_OPT_KEY = HiveSyncConfigHolder.HIVE_USE_JDBC.key()
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
@Deprecated
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.key()
Expand Down Expand Up @@ -686,6 +693,9 @@ object DataSourceWriteOptions {
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.defaultValue()
@Deprecated
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
/** @deprecated Use {@link HIVE_USE_JDBC} and its methods instead */
@Deprecated
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = HiveSyncConfigHolder.HIVE_USE_JDBC.defaultValue()
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
@Deprecated
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.defaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS)
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING)
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_USE_JDBC)
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE)
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE)
hoodieConfig.setDefaultValue(ASYNC_COMPACT_ENABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;

Expand Down Expand Up @@ -94,6 +95,9 @@ public static class HiveSyncConfigParams {
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+ "org.apache.hudi input format.")
public Boolean usePreApacheInputFormat;
@Deprecated
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
public Boolean useJdbc;
@Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
public String metastoreUris;
@Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
Expand Down Expand Up @@ -138,6 +142,7 @@ public TypedProperties toProps() {
props.setPropertyIfNonNull(HIVE_PASS.key(), hivePass);
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), usePreApacheInputFormat);
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), useJdbc);
props.setPropertyIfNonNull(HIVE_SYNC_MODE.key(), syncMode);
props.setPropertyIfNonNull(METASTORE_URIS.key(), metastoreUris);
props.setPropertyIfNonNull(HIVE_AUTO_CREATE_DATABASE.key(), autoCreateDatabase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public class HiveSyncConfigHolder {
.withDocumentation("Flag to choose InputFormat under com.uber.hoodie package instead of org.apache.hudi package. "
+ "Use this when you are in the process of migrating from "
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format");
/**
* @deprecated Use {@link #HIVE_SYNC_MODE} instead of this config from 0.9.0
*/
@Deprecated
public static final ConfigProperty<String> HIVE_USE_JDBC = ConfigProperty
.key("hoodie.datasource.hive_sync.use_jdbc")
.defaultValue("true")
.deprecatedAfter("0.9.0")
.withDocumentation("Use JDBC when hive synchronization is enabled");
public static final ConfigProperty<String> METASTORE_URIS = ConfigProperty
.key("hoodie.datasource.hive_sync.metastore.uris")
.defaultValue("thrift://localhost:9083")
Expand Down Expand Up @@ -100,7 +109,7 @@ public class HiveSyncConfigHolder {
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
.key("hoodie.datasource.hive_sync.mode")
.defaultValue("jdbc")
.noDefaultValue()
.withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.");
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = ConfigProperty
.key("hoodie.datasource.hive_sync.bucket_sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.ddl.DDLExecutor;
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
Expand Down Expand Up @@ -48,6 +49,7 @@

import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

Expand All @@ -70,19 +72,23 @@ public HoodieHiveSyncClient(HiveSyncConfig config) {
// Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
// disable jdbc and depend on metastore client for all hive registrations
try {
HiveSyncMode syncMode = HiveSyncMode.of(config.getStringOrDefault(HIVE_SYNC_MODE));
switch (syncMode) {
case HMS:
ddlExecutor = new HMSDDLExecutor(config);
break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config);
break;
case JDBC:
ddlExecutor = new JDBCExecutor(config);
break;
default:
throw new HoodieHiveSyncException("Invalid sync mode given " + config.getString(HIVE_SYNC_MODE));
if (!StringUtils.isNullOrEmpty(config.getString(HIVE_SYNC_MODE))) {
HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE));
switch (syncMode) {
case HMS:
ddlExecutor = new HMSDDLExecutor(config);
break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config);
break;
case JDBC:
ddlExecutor = new JDBCExecutor(config);
break;
default:
throw new HoodieHiveSyncException("Invalid sync mode given " + config.getString(HIVE_SYNC_MODE));
}
} else {
ddlExecutor = config.getBoolean(HIVE_USE_JDBC) ? new JDBCExecutor(config) : new HiveQueryDDLExecutor(config);
}
this.client = Hive.get(config.getHiveConf()).getMSC();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
* This class offers DDL executor backed by the HiveQL Driver.
* This class offers DDL executor backed by the hive.ql Driver This class preserves the old useJDBC = false way of doing things.
*/
public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;

/**
* This class offers DDL executor backed by the jdbc.
* This class offers DDL executor backed by the jdbc This class preserves the old useJDBC = true way of doing things.
*/
public class JDBCExecutor extends QueryBasedDDLExecutor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.io.InputStream;
import java.util.Properties;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;

Expand Down Expand Up @@ -93,7 +92,6 @@ Properties mkGlobalHiveSyncProps(boolean forRemote) {
String jdbcUrl = forRemote ? loadedProps.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
: loadedProps.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, loadedProps.getProperty(HIVE_URL.key()));
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
props.setProperty(HIVE_SYNC_MODE.key(), "jdbc");
LOG.info("building hivesync config forRemote: " + forRemote + " " + jdbcUrl + " "
+ basePath);
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private static Iterable<Object[]> syncModeAndSchemaFromCommitMetadataAndManagedT
return opts;
}

// (useSchemaFromCommitMetadata, syncAsDataSource, syncMode)
// (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource)
private static Iterable<Object[]> syncDataSourceTableParams() {
List<Object[]> opts = new ArrayList<>();
for (Object mode : SYNC_MODES) {
Expand Down
Loading