diff --git a/docker/demo/hive-table-check.commands b/docker/demo/hive-table-check.commands index 8cdf033a79565..1102ca53bbbd0 100644 --- a/docker/demo/hive-table-check.commands +++ b/docker/demo/hive-table-check.commands @@ -22,6 +22,13 @@ show partitions stock_ticks_cow; show partitions stock_ticks_mor_ro; show partitions stock_ticks_mor_rt; +show create table stock_ticks_cow; +show create table stock_ticks_mor_ro; +show create table stock_ticks_mor_rt; +show create table stock_ticks_cow_bs; +show create table stock_ticks_mor_bs_ro; +show create table stock_ticks_mor_bs_rt; + !quit diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index c8de880bdd91a..655048c8a1b28 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -253,6 +253,10 @@ private void testHiveAfterFirstBatch() throws Exception { assertStdOutContains(stdOutErrPair, "| partition |\n+----------------+\n| dt=2018-08-31 |\n+----------------+\n", 3); + // There should have 5 data source tables except stock_ticks_mor_bs_rt. + // After [HUDI-2071] has solved, we can inc the number 5 to 6. + assertStdOutContains(stdOutErrPair, "'spark.sql.sources.provider'='hudi'", 5); + stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS); assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n" + "| GOOG | 2018-08-31 10:29:00 |\n", 6); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ce3683126c8a0..95b883abf90a1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -355,7 +355,6 @@ object DataSourceWriteOptions { // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // unexpected issues with config getting reset - val HIVE_SYNC_ENABLED_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.enable") .defaultValue("false") @@ -442,16 +441,6 @@ object DataSourceWriteOptions { .withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " + "Disabled by default for backward compatibility.") - val HIVE_TABLE_PROPERTIES: ConfigProperty[String] = ConfigProperty - .key("hoodie.datasource.hive_sync.table_properties") - .noDefaultValue() - .withDocumentation("") - - val HIVE_TABLE_SERDE_PROPERTIES: ConfigProperty[String] = ConfigProperty - .key("hoodie.datasource.hive_sync.serde_properties") - .noDefaultValue() - .withDocumentation("") - val HIVE_SYNC_AS_DATA_SOURCE_TABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.sync_as_datasource") .defaultValue("true") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 1097c420971ff..146971e18661c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter +import org.apache.hudi.hive.util.ConfigUtils import org.apache.log4j.LogManager import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} @@ -105,8 +106,14 @@ class DefaultSource extends RelationProvider val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val tableType = metaClient.getTableType - val queryType = parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue) - log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType") + + // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool, + // or else use query type from QUERY_TYPE_OPT_KEY. + val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) + .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL) + .getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue())) + + log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") (tableType, queryType, isBootstrappedTable) match { case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b290533dd348b..e62a56957edab 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -36,7 +36,6 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory -import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.index.SparkHoodieIndex import org.apache.hudi.internal.DataSourceInternalWriterHelper @@ -48,11 +47,9 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.hudi.HoodieSqlUtils -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -421,15 +418,15 @@ object HoodieSparkSqlWriter { } } - private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig): Boolean = { - val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig) + private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig, sqlConf: SQLConf): Boolean = { + val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf) val hiveConf: HiveConf = new HiveConf() hiveConf.addResource(fs.getConf) new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable() true } - private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig): HiveSyncConfig = { + private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig, sqlConf: SQLConf): HiveSyncConfig = { val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig() hiveSyncConfig.basePath = basePath.toString hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT_OPT_KEY) @@ -454,77 +451,12 @@ object HoodieSparkSqlWriter { hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt - val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean - if (syncAsDtaSourceTable) { - hiveSyncConfig.tableProperties = hoodieConfig.getStringOrDefault(HIVE_TABLE_PROPERTIES, null) - val serdePropText = createSqlTableSerdeProperties(hoodieConfig, basePath.toString) - val serdeProp = ConfigUtils.toMap(serdePropText) - serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key) - serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - - hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp) - } + hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean + hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD) hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE) hiveSyncConfig } - /** - * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES. - * @param sqlConf The spark sql conf. - * @param schema The schema to write to the table. - * @param hoodieConfig The HoodieConfig contains origin parameters. - * @return A new parameters added the HIVE_TABLE_PROPERTIES property. - */ - private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType, - hoodieConfig: HoodieConfig): HoodieConfig = { - // Convert the schema and partition info used by spark sql to hive table properties. - // The following code refers to the spark code in - // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala - - // Sync schema with meta fields - val schemaWithMetaFields = HoodieSqlUtils.addMetaFields(schema) - val partitionSet = hoodieConfig.getString(HIVE_PARTITION_FIELDS_OPT_KEY) - .split(",").map(_.trim).filter(!_.isEmpty).toSet - val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD) - - val (partitionCols, dataCols) = schemaWithMetaFields.partition(c => partitionSet.contains(c.name)) - val reOrderedType = StructType(dataCols ++ partitionCols) - val schemaParts = reOrderedType.json.grouped(threshold).toSeq - - var properties = Map( - "spark.sql.sources.provider" -> "hudi", - "spark.sql.sources.schema.numParts" -> schemaParts.size.toString - ) - schemaParts.zipWithIndex.foreach { case (part, index) => - properties += s"spark.sql.sources.schema.part.$index" -> part - } - // add partition columns - if (partitionSet.nonEmpty) { - properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString - partitionSet.zipWithIndex.foreach { case (partCol, index) => - properties += s"spark.sql.sources.schema.partCol.$index" -> partCol - } - } - var sqlPropertyText = ConfigUtils.configToString(properties) - sqlPropertyText = if (hoodieConfig.contains(HIVE_TABLE_PROPERTIES)) { - sqlPropertyText + "\n" + hoodieConfig.getString(HIVE_TABLE_PROPERTIES) - } else { - sqlPropertyText - } - hoodieConfig.setValue(HIVE_TABLE_PROPERTIES, sqlPropertyText) - hoodieConfig - } - - private def createSqlTableSerdeProperties(hoodieConfig: HoodieConfig, basePath: String): String = { - val pathProp = s"path=$basePath" - if (hoodieConfig.contains(HIVE_TABLE_SERDE_PROPERTIES)) { - pathProp + "\n" + hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES) - } else { - pathProp - } - } - private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path, schema: StructType): Boolean = { val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED_OPT_KEY).toBoolean @@ -532,7 +464,6 @@ object HoodieSparkSqlWriter { var syncClientToolClassSet = scala.collection.mutable.Set[String]() hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass) - val newHoodieConfig = addSqlTableProperties(spark.sessionState.conf, schema, hoodieConfig) // for backward compatibility if (hiveSyncEnabled) { metaSyncEnabled = true @@ -545,12 +476,12 @@ object HoodieSparkSqlWriter { val syncSuccess = impl.trim match { case "org.apache.hudi.hive.HiveSyncTool" => { log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL_OPT_KEY) + ")") - syncHive(basePath, fs, newHoodieConfig) + syncHive(basePath, fs, hoodieConfig, spark.sessionState.conf) true } case _ => { val properties = new Properties() - properties.putAll(newHoodieConfig.getProps) + properties.putAll(hoodieConfig.getProps) properties.put("basePath", basePath.toString) val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool] syncHoodie.syncHoodieTable() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index d37dac444cf39..6f5f6e699359b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -37,8 +37,7 @@ import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUt import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{expr, lit} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} @@ -538,11 +537,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { test("Test build sync config for spark sql") { initSparkContext("test build sync config") - val addSqlTablePropertiesMethod = - HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", - classOf[SQLConf], classOf[StructType], classOf[HoodieConfig]) - addSqlTablePropertiesMethod.setAccessible(true) - val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val basePath = "/tmp/hoodie_test" @@ -555,49 +549,23 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) - val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, - spark.sessionState.conf, structType, hoodieConfig) - .asInstanceOf[HoodieConfig] val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], - classOf[HoodieConfig]) + classOf[HoodieConfig], classOf[SQLConf]) buildSyncConfigMethod.setAccessible(true) val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, - new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] - + new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] assertTrue(hiveSyncConfig.skipROSuffix) assertTrue(hiveSyncConfig.createManagedTable) - assertResult("spark.sql.sources.provider=hudi\n" + - "spark.sql.sources.schema.partCol.0=partition\n" + - "spark.sql.sources.schema.numParts=1\n" + - "spark.sql.sources.schema.numPartCols=1\n" + - "spark.sql.sources.schema.part.0=" + - "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\"," + - "\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":" + - "\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + - "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties) - assertResult("path=/tmp/hoodie_test\n" + - "spark.query.type.key=hoodie.datasource.query.type\n" + - "spark.query.as.rt.key=snapshot\n" + - "spark.query.as.ro.key=read_optimized")(hiveSyncConfig.serdeProperties) + assertTrue(hiveSyncConfig.syncAsSparkDataSourceTable) + assertResult(spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD))(hiveSyncConfig.sparkSchemaLengthThreshold) } test("Test build sync config for skip Ro Suffix vals") { initSparkContext("test build sync config for skip Ro suffix vals") - val addSqlTablePropertiesMethod = - HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", - classOf[SQLConf], classOf[StructType], classOf[HoodieConfig]) - addSqlTablePropertiesMethod.setAccessible(true) - val schema = DataSourceTestUtils.getStructTypeExampleSchema - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val basePath = "/tmp/hoodie_test" val params = Map( "path" -> basePath, @@ -606,18 +574,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) - val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, - spark.sessionState.conf, structType, hoodieConfig) - .asInstanceOf[HoodieConfig] val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], - classOf[HoodieConfig]) + classOf[HoodieConfig], classOf[SQLConf]) buildSyncConfigMethod.setAccessible(true) val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, - new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] - + new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] assertFalse(hiveSyncConfig.skipROSuffix) } diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index fd63028951047..03e8b3e3701ea 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -150,6 +150,12 @@ test + + org.apache.spark + spark-sql_${scala.binary.version} + test + + org.eclipse.jetty.aggregate diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 09c3e7b354f2b..47f23864c01ca 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -110,6 +110,12 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive") public Integer batchSyncNum = 1000; + @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.") + public Boolean syncAsSparkDataSourceTable = true; + + @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.") + public int sparkSchemaLengthThreshold = 4000; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -131,6 +137,8 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.serdeProperties = cfg.serdeProperties; newConfig.createManagedTable = cfg.createManagedTable; newConfig.batchSyncNum = cfg.batchSyncNum; + newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable; + newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold; return newConfig; } @@ -160,6 +168,8 @@ public String toString() { + ", supportTimestamp=" + supportTimestamp + ", decodePartition=" + decodePartition + ", createManagedTable=" + createManagedTable + + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable + + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 7264c8dffea9d..88efabea8fe24 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -20,11 +20,13 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hive.util.ConfigUtils; +import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.util.HiveSchemaUtil; @@ -37,13 +39,20 @@ import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; + /** * Tool to sync a hoodie HDFS table with a hive metastore table. Either use it as a api * HiveSyncTool.syncHoodieTable(HiveSyncConfig) or as a command line java -cp hoodie-hive-sync.jar HiveSyncTool [args] @@ -152,6 +161,16 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Get the parquet schema for this table looking at the latest commit MessageType schema = hoodieHiveClient.getDataSchema(); + + // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, + // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table + // by the data source way (which will use the HoodieBootstrapRelation). + // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. + if (hoodieHiveClient.isBootstrap() + && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ + && !readAsOptimized) { + cfg.syncAsSparkDataSourceTable = false; + } // Sync schema if needed syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); @@ -180,6 +199,15 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, */ private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) { + // Append spark table properties & serde properties + Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); + Map serdeProperties = ConfigUtils.toMap(cfg.serdeProperties); + if (cfg.syncAsSparkDataSourceTable) { + Map sparkTableProperties = getSparkTableProperties(cfg.sparkSchemaLengthThreshold, schema); + Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized); + tableProperties.putAll(sparkTableProperties); + serdeProperties.putAll(sparkSerdeProperties); + } // Check and sync schema if (!tableExists) { LOG.info("Hive table " + tableName + " is not found. Creating it"); @@ -196,27 +224,11 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat); String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); - Map serdeProperties = ConfigUtils.toMap(cfg.serdeProperties); - - // The serdeProperties is non-empty only for spark sync meta data currently. - if (!serdeProperties.isEmpty()) { - String queryTypeKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_TYPE_KEY); - String queryAsROKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RO_KEY); - String queryAsRTKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RT_KEY); - - if (queryTypeKey != null && queryAsROKey != null && queryAsRTKey != null) { - if (readAsOptimized) { // read optimized - serdeProperties.put(queryTypeKey, queryAsROKey); - } else { // read snapshot - serdeProperties.put(queryTypeKey, queryAsRTKey); - } - } - } // Custom serde will not work with ALTER TABLE REPLACE COLUMNS // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive // /ql/exec/DDLTask.java#L3488 hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, - outputFormatClassName, serDeFormatClassName, serdeProperties, ConfigUtils.toMap(cfg.tableProperties)); + outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties); } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); @@ -226,7 +238,6 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi hoodieHiveClient.updateTableDefinition(tableName, schema); // Sync the table properties if the schema has changed if (cfg.tableProperties != null) { - Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); hoodieHiveClient.updateTableProperties(tableName, tableProperties); LOG.info("Sync table properties for " + tableName + ", table properties is: " + cfg.tableProperties); } @@ -236,6 +247,72 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi } } + /** + * Get Spark Sql related table properties. This is used for spark datasource table. + * @param schema The schema to write to the table. + * @return A new parameters added the spark's table properties. + */ + private Map getSparkTableProperties(int schemaLengthThreshold, MessageType schema) { + // Convert the schema and partition info used by spark sql to hive table properties. + // The following code refers to the spark code in + // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala + GroupType originGroupType = schema.asGroupType(); + List partitionNames = cfg.partitionFields; + List partitionCols = new ArrayList<>(); + List dataCols = new ArrayList<>(); + Map column2Field = new HashMap<>(); + + for (Type field : originGroupType.getFields()) { + column2Field.put(field.getName(), field); + } + // Get partition columns and data columns. + for (String partitionName : partitionNames) { + // Default the unknown partition fields to be String. + // Keep the same logical with HiveSchemaUtil#getPartitionKeyType. + partitionCols.add(column2Field.getOrDefault(partitionName, + new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8))); + } + + for (Type field : originGroupType.getFields()) { + if (!partitionNames.contains(field.getName())) { + dataCols.add(field); + } + } + + List reOrderedFields = new ArrayList<>(); + reOrderedFields.addAll(dataCols); + reOrderedFields.addAll(partitionCols); + GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields); + + Map sparkProperties = new HashMap<>(); + sparkProperties.put("spark.sql.sources.provider", "hudi"); + // Split the schema string to multi-parts according the schemaLengthThreshold size. + String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType); + int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold; + sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart)); + // Add each part of schema string to sparkProperties + for (int i = 0; i < numSchemaPart; i++) { + int start = i * schemaLengthThreshold; + int end = Math.min(start + schemaLengthThreshold, schemaString.length()); + sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end)); + } + // Add partition columns + if (!partitionNames.isEmpty()) { + sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size())); + for (int i = 0; i < partitionNames.size(); i++) { + sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i)); + } + } + return sparkProperties; + } + + private Map getSparkSerdeProperties(boolean readAsOptimized) { + Map sparkSerdeProperties = new HashMap<>(); + sparkSerdeProperties.put("path", cfg.basePath); + sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized)); + return sparkSerdeProperties; + } + /** * Syncs the list of storage parititions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java index b8745b6e30807..94ebdaadd8ff3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java @@ -23,12 +23,11 @@ import org.apache.hudi.common.util.StringUtils; public class ConfigUtils { - - public static final String SPARK_QUERY_TYPE_KEY = "spark.query.type.key"; - - public static final String SPARK_QUERY_AS_RO_KEY = "spark.query.as.ro.key"; - - public static final String SPARK_QUERY_AS_RT_KEY = "spark.query.as.rt.key"; + /** + * Config stored in hive serde properties to tell query engine (spark/flink) to + * read the table as a read-optimized table when this config is true. + */ + public static final String IS_QUERY_AS_RO_TABLE = "hoodie.query.as.ro.table"; /** * Convert the key-value config to a map.The format of the config diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java new file mode 100644 index 0000000000000..debc262b5518c --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; + +/** + * Convert the parquet schema to spark schema' json string. + * This code is refer to org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter + * in spark project. + */ +public class Parquet2SparkSchemaUtils { + + public static String convertToSparkSchemaJson(GroupType parquetSchema) { + String fieldsJsonString = parquetSchema.getFields().stream().map(field -> { + switch (field.getRepetition()) { + case OPTIONAL: + return "{\"name\":\"" + field.getName() + "\",\"type\":" + convertFieldType(field) + + ",\"nullable\":true,\"metadata\":{}}"; + case REQUIRED: + return "{\"name\":\"" + field.getName() + "\",\"type\":" + convertFieldType(field) + + ",\"nullable\":false,\"metadata\":{}}"; + case REPEATED: + String arrayType = arrayType(field, false); + return "{\"name\":\"" + field.getName() + "\",\"type\":" + arrayType + + ",\"nullable\":false,\"metadata\":{}}"; + default: + throw new UnsupportedOperationException("Unsupport convert " + field + " to spark sql type"); + } + }).reduce((a, b) -> a + "," + b).orElse(""); + return "{\"type\":\"struct\",\"fields\":[" + fieldsJsonString + "]}"; + } + + private static String convertFieldType(Type field) { + if (field instanceof PrimitiveType) { + return "\"" + convertPrimitiveType((PrimitiveType) field) + "\""; + } else { + assert field instanceof GroupType; + return convertGroupField((GroupType) field); + } + } + + private static String convertPrimitiveType(PrimitiveType field) { + PrimitiveType.PrimitiveTypeName typeName = field.getPrimitiveTypeName(); + OriginalType originalType = field.getOriginalType(); + + switch (typeName) { + case BOOLEAN: return "boolean"; + case FLOAT: return "float"; + case DOUBLE: return "double"; + case INT32: + if (originalType == null) { + return "integer"; + } + switch (originalType) { + case INT_8: return "byte"; + case INT_16: return "short"; + case INT_32: return "integer"; + case DATE: return "date"; + case DECIMAL: + return "decimal(" + field.getDecimalMetadata().getPrecision() + "," + + field.getDecimalMetadata().getScale() + ")"; + default: throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); + } + case INT64: + if (originalType == null) { + return "long"; + } + switch (originalType) { + case INT_64: return "long"; + case DECIMAL: + return "decimal(" + field.getDecimalMetadata().getPrecision() + "," + + field.getDecimalMetadata().getScale() + ")"; + case TIMESTAMP_MICROS: + case TIMESTAMP_MILLIS: + return "timestamp"; + default: + throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); + } + case INT96: return "timestamp"; + + case BINARY: + if (originalType == null) { + return "binary"; + } + switch (originalType) { + case UTF8: + case ENUM: + case JSON: + return "string"; + case BSON: return "binary"; + case DECIMAL: + return "decimal(" + field.getDecimalMetadata().getPrecision() + "," + + field.getDecimalMetadata().getScale() + ")"; + default: + throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); + } + + case FIXED_LEN_BYTE_ARRAY: + switch (originalType) { + case DECIMAL: + return "decimal(" + field.getDecimalMetadata().getPrecision() + "," + + field.getDecimalMetadata().getScale() + ")"; + default: + throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); + } + default: + throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); + } + } + + private static String convertGroupField(GroupType field) { + if (field.getOriginalType() == null) { + return convertToSparkSchemaJson(field); + } + switch (field.getOriginalType()) { + case LIST: + ValidationUtils.checkArgument(field.getFieldCount() == 1, "Illegal List type: " + field); + Type repeatedType = field.getType(0); + if (isElementType(repeatedType, field.getName())) { + return arrayType(repeatedType, false); + } else { + Type elementType = repeatedType.asGroupType().getType(0); + boolean optional = elementType.isRepetition(OPTIONAL); + return arrayType(elementType, optional); + } + case MAP: + case MAP_KEY_VALUE: + GroupType keyValueType = field.getType(0).asGroupType(); + Type keyType = keyValueType.getType(0); + Type valueType = keyValueType.getType(1); + boolean valueOptional = valueType.isRepetition(OPTIONAL); + return "{\"type\":\"map\", \"keyType\":" + convertFieldType(keyType) + + ",\"valueType\":" + convertFieldType(valueType) + + ",\"valueContainsNull\":" + valueOptional + "}"; + default: + throw new UnsupportedOperationException("Unsupport convert " + field + " to spark sql type"); + } + } + + private static String arrayType(Type elementType, boolean containsNull) { + return "{\"type\":\"array\", \"elementType\":" + convertFieldType(elementType) + ",\"containsNull\":" + containsNull + "}"; + } + + private static boolean isElementType(Type repeatedType, String parentName) { + return repeatedType.isPrimitive() || repeatedType.asGroupType().getFieldCount() > 1 + || repeatedType.getName().equals("array") || repeatedType.getName().equals(parentName + "_tuple"); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index c4125337ea9d8..3494e4451fbab 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -42,7 +42,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; - import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; @@ -70,6 +69,11 @@ private static Iterable useJdbcAndSchemaFromCommitMetadataAndManagedTa return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); } + // (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource) + private static Iterable syncDataSourceTableParams() { + return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); + } + @BeforeEach public void setUp() throws Exception { HiveTestUtil.setUp(); @@ -157,17 +161,15 @@ public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) } @ParameterizedTest - @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) + @MethodSource({"syncDataSourceTableParams"}) public void testSyncCOWTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata) throws Exception { + boolean useSchemaFromCommitMetadata, + boolean syncAsDataSourceTable) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); - put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); - put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); - put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); } }; @@ -177,6 +179,7 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, put("tp_1", "p1"); } }; + hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable; hiveSyncConfig.useJdbc = useJdbc; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); @@ -195,9 +198,12 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, String tblPropertiesWithoutDdlTime = String.join("\n", results.subList(0, results.size() - 1)); + + String sparkTableProperties = getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata); assertEquals( "EXTERNAL\tTRUE\n" + "last_commit_time_sync\t100\n" + + sparkTableProperties + "tp_0\tp0\n" + "tp_1\tp1", tblPropertiesWithoutDdlTime); assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); @@ -208,21 +214,54 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, hiveDriver.getResults(results); String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); - assertTrue(ddl.contains("'hoodie.datasource.query.type'='snapshot'")); + if (syncAsDataSourceTable) { + assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='false'")); + } + } + + private String getSparkTableProperties(boolean syncAsDataSourceTable, boolean useSchemaFromCommitMetadata) { + if (syncAsDataSourceTable) { + if (useSchemaFromCommitMetadata) { + return "spark.sql.sources.provider\thudi\n" + + "spark.sql.sources.schema.numPartCols\t1\n" + + "spark.sql.sources.schema.numParts\t1\n" + + "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":" + + "[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"favorite_number\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" + + "spark.sql.sources.schema.partCol.0\tdatestr\n"; + } else { + return "spark.sql.sources.provider\thudi\n" + + "spark.sql.sources.schema.numPartCols\t1\n" + + "spark.sql.sources.schema.numParts\t1\n" + + "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":" + + "\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_number\",\"type\":\"integer\"," + + "\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false," + + "\"metadata\":{}}]}\n" + + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" + + "spark.sql.sources.schema.partCol.0\tdatestr\n"; + } + } else { + return ""; + } } @ParameterizedTest - @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) + @MethodSource({"syncDataSourceTableParams"}) public void testSyncMORTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata) throws Exception { + boolean useSchemaFromCommitMetadata, + boolean syncAsDataSourceTable) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); - put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); - put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); - put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); } }; @@ -232,6 +271,7 @@ public void testSyncMORTableWithProperties(boolean useJdbc, put("tp_1", "p1"); } }; + hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable; hiveSyncConfig.useJdbc = useJdbc; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); @@ -247,14 +287,15 @@ public void testSyncMORTableWithProperties(boolean useJdbc, String rtTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; String[] tableNames = new String[] {roTableName, rtTableName}; - String[] expectQueryTypes = new String[] {"read_optimized", "snapshot"}; + String[] readAsOptimizedResults = new String[] {"true", "false"}; SessionState.start(HiveTestUtil.getHiveConf()); Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf()); + String sparkTableProperties = getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata); for (int i = 0;i < 2; i++) { String dbTableName = hiveSyncConfig.databaseName + "." + tableNames[i]; - String expectQueryType = expectQueryTypes[i]; + String readAsOptimized = readAsOptimizedResults[i]; hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName); List results = new ArrayList<>(); @@ -265,6 +306,7 @@ public void testSyncMORTableWithProperties(boolean useJdbc, assertEquals( "EXTERNAL\tTRUE\n" + "last_commit_time_sync\t101\n" + + sparkTableProperties + "tp_0\tp0\n" + "tp_1\tp1", tblPropertiesWithoutDdlTime); assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); @@ -275,8 +317,10 @@ public void testSyncMORTableWithProperties(boolean useJdbc, hiveDriver.getResults(results); String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); - assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'")); assertTrue(ddl.toLowerCase().contains("create external table")); + if (syncAsDataSourceTable) { + assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='" + readAsOptimized + "'")); + } } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java new file mode 100644 index 0000000000000..15575c4607b27 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive; + +import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; +import org.apache.spark.sql.execution.SparkSqlParser; +import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.IntegerType$; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestParquet2SparkSchemaUtils { + private final SparkToParquetSchemaConverter spark2ParquetConverter = + new SparkToParquetSchemaConverter( + (Boolean) SQLConf.PARQUET_WRITE_LEGACY_FORMAT().defaultValue().get(), + SQLConf.ParquetOutputTimestampType$.MODULE$.INT96()); + private final SparkSqlParser parser = new SparkSqlParser(new SQLConf()); + + @Test + public void testConvertPrimitiveType() { + StructType sparkSchema = parser.parseTableSchema( + "f0 int, f1 string, f3 bigint," + + " f4 decimal(5,2), f5 timestamp, f6 date," + + " f7 short, f8 float, f9 double, f10 byte," + + " f11 tinyint, f12 smallint, f13 binary, f14 boolean"); + + String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( + spark2ParquetConverter.convert(sparkSchema).asGroupType()); + StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson); + assertEquals(sparkSchema.json(), convertedSparkSchema.json()); + // Test type with nullable + StructField field0 = new StructField("f0", StringType$.MODULE$, false, Metadata.empty()); + StructField field1 = new StructField("f1", StringType$.MODULE$, true, Metadata.empty()); + StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1}); + String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( + spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType()); + StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson); + assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json()); + } + + @Test + public void testConvertComplexType() { + StructType sparkSchema = parser.parseTableSchema( + "f0 int, f1 map, f2 array" + + ",f3 map, bigint>, f4 array>" + + ",f5 struct"); + String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( + spark2ParquetConverter.convert(sparkSchema).asGroupType()); + StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson); + assertEquals(sparkSchema.json(), convertedSparkSchema.json()); + // Test complex type with nullable + StructField field0 = new StructField("f0", new ArrayType(StringType$.MODULE$, true), false, Metadata.empty()); + StructField field1 = new StructField("f1", new MapType(StringType$.MODULE$, IntegerType$.MODULE$, true), false, Metadata.empty()); + StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1}); + String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( + spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType()); + StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson); + assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json()); + } +} diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 90f6017f1c060..1107d744e68b0 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -108,6 +108,10 @@ public FileSystem getFs() { return fs; } + public boolean isBootstrap() { + return metaClient.getTableConfig().getBootstrapBasePath().isPresent(); + } + public void closeQuietly(ResultSet resultSet, Statement stmt) { try { if (stmt != null) {