Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docker/demo/hive-table-check.commands
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ show partitions stock_ticks_cow;
show partitions stock_ticks_mor_ro;
show partitions stock_ticks_mor_rt;

show create table stock_ticks_cow;
show create table stock_ticks_mor_ro;
show create table stock_ticks_mor_rt;
show create table stock_ticks_cow_bs;
show create table stock_ticks_mor_bs_ro;
show create table stock_ticks_mor_bs_rt;

!quit


Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ private void testHiveAfterFirstBatch() throws Exception {
assertStdOutContains(stdOutErrPair,
"| partition |\n+----------------+\n| dt=2018-08-31 |\n+----------------+\n", 3);

// There should have 5 data source tables except stock_ticks_mor_bs_rt.
// After [HUDI-2071] has solved, we can inc the number 5 to 6.
assertStdOutContains(stdOutErrPair, "'spark.sql.sources.provider'='hudi'", 5);

stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:29:00 |\n", 6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ object DataSourceWriteOptions {
// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
// unexpected issues with config getting reset

val HIVE_SYNC_ENABLED_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.enable")
.defaultValue("false")
Expand Down Expand Up @@ -442,16 +441,6 @@ object DataSourceWriteOptions {
.withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " +
"Disabled by default for backward compatibility.")

val HIVE_TABLE_PROPERTIES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.table_properties")
.noDefaultValue()
.withDocumentation("")

val HIVE_TABLE_SERDE_PROPERTIES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.serde_properties")
.noDefaultValue()
.withDocumentation("")

val HIVE_SYNC_AS_DATA_SOURCE_TABLE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.sync_as_datasource")
.defaultValue("true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.hive.util.ConfigUtils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split it via an empty line

import org.apache.log4j.LogManager
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
Expand Down Expand Up @@ -105,8 +106,14 @@ class DefaultSource extends RelationProvider
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType
val queryType = parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue)
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType")

// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE_OPT_KEY.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue()))

log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")

(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndex
import org.apache.hudi.internal.DataSourceInternalWriterHelper
Expand All @@ -48,11 +47,9 @@ import org.apache.spark.SPARK_VERSION
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -421,15 +418,15 @@ object HoodieSparkSqlWriter {
}
}

private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig)
private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig, sqlConf: SQLConf): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf)
val hiveConf: HiveConf = new HiveConf()
hiveConf.addResource(fs.getConf)
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
true
}

private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig): HiveSyncConfig = {
private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig, sqlConf: SQLConf): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
hiveSyncConfig.basePath = basePath.toString
hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT_OPT_KEY)
Expand All @@ -454,85 +451,19 @@ object HoodieSparkSqlWriter {
hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean
hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt

val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
if (syncAsDtaSourceTable) {
hiveSyncConfig.tableProperties = hoodieConfig.getStringOrDefault(HIVE_TABLE_PROPERTIES, null)
val serdePropText = createSqlTableSerdeProperties(hoodieConfig, basePath.toString)
val serdeProp = ConfigUtils.toMap(serdePropText)
serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key)
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)

hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
}
hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD)
hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
hiveSyncConfig
}

/**
* Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
* @param sqlConf The spark sql conf.
* @param schema The schema to write to the table.
* @param hoodieConfig The HoodieConfig contains origin parameters.
* @return A new parameters added the HIVE_TABLE_PROPERTIES property.
*/
private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
hoodieConfig: HoodieConfig): HoodieConfig = {
// Convert the schema and partition info used by spark sql to hive table properties.
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

// Sync schema with meta fields
val schemaWithMetaFields = HoodieSqlUtils.addMetaFields(schema)
val partitionSet = hoodieConfig.getString(HIVE_PARTITION_FIELDS_OPT_KEY)
.split(",").map(_.trim).filter(!_.isEmpty).toSet
val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)

val (partitionCols, dataCols) = schemaWithMetaFields.partition(c => partitionSet.contains(c.name))
val reOrderedType = StructType(dataCols ++ partitionCols)
val schemaParts = reOrderedType.json.grouped(threshold).toSeq

var properties = Map(
"spark.sql.sources.provider" -> "hudi",
"spark.sql.sources.schema.numParts" -> schemaParts.size.toString
)
schemaParts.zipWithIndex.foreach { case (part, index) =>
properties += s"spark.sql.sources.schema.part.$index" -> part
}
// add partition columns
if (partitionSet.nonEmpty) {
properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString
partitionSet.zipWithIndex.foreach { case (partCol, index) =>
properties += s"spark.sql.sources.schema.partCol.$index" -> partCol
}
}
var sqlPropertyText = ConfigUtils.configToString(properties)
sqlPropertyText = if (hoodieConfig.contains(HIVE_TABLE_PROPERTIES)) {
sqlPropertyText + "\n" + hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
} else {
sqlPropertyText
}
hoodieConfig.setValue(HIVE_TABLE_PROPERTIES, sqlPropertyText)
hoodieConfig
}

private def createSqlTableSerdeProperties(hoodieConfig: HoodieConfig, basePath: String): String = {
val pathProp = s"path=$basePath"
if (hoodieConfig.contains(HIVE_TABLE_SERDE_PROPERTIES)) {
pathProp + "\n" + hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
} else {
pathProp
}
}

private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path,
schema: StructType): Boolean = {
val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED_OPT_KEY).toBoolean
var metaSyncEnabled = hoodieConfig.getStringOrDefault(META_SYNC_ENABLED_OPT_KEY).toBoolean
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)

val newHoodieConfig = addSqlTableProperties(spark.sessionState.conf, schema, hoodieConfig)
// for backward compatibility
if (hiveSyncEnabled) {
metaSyncEnabled = true
Expand All @@ -545,12 +476,12 @@ object HoodieSparkSqlWriter {
val syncSuccess = impl.trim match {
case "org.apache.hudi.hive.HiveSyncTool" => {
log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL_OPT_KEY) + ")")
syncHive(basePath, fs, newHoodieConfig)
syncHive(basePath, fs, hoodieConfig, spark.sessionState.conf)
true
}
case _ => {
val properties = new Properties()
properties.putAll(newHoodieConfig.getProps)
properties.putAll(hoodieConfig.getProps)
properties.put("basePath", basePath.toString)
val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
syncHoodie.syncHoodieTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUt
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
Expand Down Expand Up @@ -538,11 +537,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {

test("Test build sync config for spark sql") {
initSparkContext("test build sync config")
val addSqlTablePropertiesMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
classOf[SQLConf], classOf[StructType], classOf[HoodieConfig])
addSqlTablePropertiesMethod.setAccessible(true)

val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val basePath = "/tmp/hoodie_test"
Expand All @@ -555,49 +549,23 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
)
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
spark.sessionState.conf, structType, hoodieConfig)
.asInstanceOf[HoodieConfig]

val buildSyncConfigMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[HoodieConfig])
classOf[HoodieConfig], classOf[SQLConf])
buildSyncConfigMethod.setAccessible(true)

val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig]

new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig]
assertTrue(hiveSyncConfig.skipROSuffix)
assertTrue(hiveSyncConfig.createManagedTable)
assertResult("spark.sql.sources.provider=hudi\n" +
"spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" +
"spark.sql.sources.schema.numPartCols=1\n" +
"spark.sql.sources.schema.part.0=" +
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\"," +
"\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":" +
"\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
"{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
assertResult("path=/tmp/hoodie_test\n" +
"spark.query.type.key=hoodie.datasource.query.type\n" +
"spark.query.as.rt.key=snapshot\n" +
"spark.query.as.ro.key=read_optimized")(hiveSyncConfig.serdeProperties)
assertTrue(hiveSyncConfig.syncAsSparkDataSourceTable)
assertResult(spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD))(hiveSyncConfig.sparkSchemaLengthThreshold)
}

test("Test build sync config for skip Ro Suffix vals") {
initSparkContext("test build sync config for skip Ro suffix vals")
val addSqlTablePropertiesMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
classOf[SQLConf], classOf[StructType], classOf[HoodieConfig])
addSqlTablePropertiesMethod.setAccessible(true)

val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val basePath = "/tmp/hoodie_test"
val params = Map(
"path" -> basePath,
Expand All @@ -606,18 +574,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
)
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
spark.sessionState.conf, structType, hoodieConfig)
.asInstanceOf[HoodieConfig]

val buildSyncConfigMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[HoodieConfig])
classOf[HoodieConfig], classOf[SQLConf])
buildSyncConfigMethod.setAccessible(true)

val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig]

new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig]
assertFalse(hiveSyncConfig.skipROSuffix)
}

Expand Down
6 changes: 6 additions & 0 deletions hudi-sync/hudi-hive-sync/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<!-- Needed for running HiveServer for Tests -->
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
public Integer batchSyncNum = 1000;

@Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
public Boolean syncAsSparkDataSourceTable = true;

@Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
public int sparkSchemaLengthThreshold = 4000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4000 a special value here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the default value in spark conf: spark.sql.sources.schemaStringLengthThreshold


// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
Expand All @@ -131,6 +137,8 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) {
newConfig.serdeProperties = cfg.serdeProperties;
newConfig.createManagedTable = cfg.createManagedTable;
newConfig.batchSyncNum = cfg.batchSyncNum;
newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable;
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
return newConfig;
}

Expand Down Expand Up @@ -160,6 +168,8 @@ public String toString() {
+ ", supportTimestamp=" + supportTimestamp
+ ", decodePartition=" + decodePartition
+ ", createManagedTable=" + createManagedTable
+ ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
+ ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ '}';
}
}
Loading