diff --git a/docker/demo/hive-table-check.commands b/docker/demo/hive-table-check.commands index 1102ca53bbbd0..8cdf033a79565 100644 --- a/docker/demo/hive-table-check.commands +++ b/docker/demo/hive-table-check.commands @@ -22,13 +22,6 @@ show partitions stock_ticks_cow; show partitions stock_ticks_mor_ro; show partitions stock_ticks_mor_rt; -show create table stock_ticks_cow; -show create table stock_ticks_mor_ro; -show create table stock_ticks_mor_rt; -show create table stock_ticks_cow_bs; -show create table stock_ticks_mor_bs_ro; -show create table stock_ticks_mor_bs_rt; - !quit diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index 655048c8a1b28..c8de880bdd91a 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -253,10 +253,6 @@ private void testHiveAfterFirstBatch() throws Exception { assertStdOutContains(stdOutErrPair, "| partition |\n+----------------+\n| dt=2018-08-31 |\n+----------------+\n", 3); - // There should have 5 data source tables except stock_ticks_mor_bs_rt. - // After [HUDI-2071] has solved, we can inc the number 5 to 6. - assertStdOutContains(stdOutErrPair, "'spark.sql.sources.provider'='hudi'", 5); - stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS); assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n" + "| GOOG | 2018-08-31 10:29:00 |\n", 6); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 95b883abf90a1..ce3683126c8a0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -355,6 +355,7 @@ object DataSourceWriteOptions { // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // unexpected issues with config getting reset + val HIVE_SYNC_ENABLED_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.enable") .defaultValue("false") @@ -441,6 +442,16 @@ object DataSourceWriteOptions { .withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " + "Disabled by default for backward compatibility.") + val HIVE_TABLE_PROPERTIES: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.hive_sync.table_properties") + .noDefaultValue() + .withDocumentation("") + + val HIVE_TABLE_SERDE_PROPERTIES: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.hive_sync.serde_properties") + .noDefaultValue() + .withDocumentation("") + val HIVE_SYNC_AS_DATA_SOURCE_TABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.sync_as_datasource") .defaultValue("true") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 146971e18661c..1097c420971ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter -import org.apache.hudi.hive.util.ConfigUtils import org.apache.log4j.LogManager import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} @@ -106,14 +105,8 @@ class DefaultSource extends RelationProvider val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val tableType = metaClient.getTableType - - // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool, - // or else use query type from QUERY_TYPE_OPT_KEY. - val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) - .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL) - .getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue())) - - log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") + val queryType = parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue) + log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType") (tableType, queryType, isBootstrappedTable) match { case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e62a56957edab..b290533dd348b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory +import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.index.SparkHoodieIndex import org.apache.hudi.internal.DataSourceInternalWriterHelper @@ -47,9 +48,11 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession} -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -418,15 +421,15 @@ object HoodieSparkSqlWriter { } } - private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig, sqlConf: SQLConf): Boolean = { - val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf) + private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig): Boolean = { + val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig) val hiveConf: HiveConf = new HiveConf() hiveConf.addResource(fs.getConf) new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable() true } - private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig, sqlConf: SQLConf): HiveSyncConfig = { + private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig): HiveSyncConfig = { val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig() hiveSyncConfig.basePath = basePath.toString hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT_OPT_KEY) @@ -451,12 +454,77 @@ object HoodieSparkSqlWriter { hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt - hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean - hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD) + val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean + if (syncAsDtaSourceTable) { + hiveSyncConfig.tableProperties = hoodieConfig.getStringOrDefault(HIVE_TABLE_PROPERTIES, null) + val serdePropText = createSqlTableSerdeProperties(hoodieConfig, basePath.toString) + val serdeProp = ConfigUtils.toMap(serdePropText) + serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key) + serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + + hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp) + } hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE) hiveSyncConfig } + /** + * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES. + * @param sqlConf The spark sql conf. + * @param schema The schema to write to the table. + * @param hoodieConfig The HoodieConfig contains origin parameters. + * @return A new parameters added the HIVE_TABLE_PROPERTIES property. + */ + private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType, + hoodieConfig: HoodieConfig): HoodieConfig = { + // Convert the schema and partition info used by spark sql to hive table properties. + // The following code refers to the spark code in + // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala + + // Sync schema with meta fields + val schemaWithMetaFields = HoodieSqlUtils.addMetaFields(schema) + val partitionSet = hoodieConfig.getString(HIVE_PARTITION_FIELDS_OPT_KEY) + .split(",").map(_.trim).filter(!_.isEmpty).toSet + val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD) + + val (partitionCols, dataCols) = schemaWithMetaFields.partition(c => partitionSet.contains(c.name)) + val reOrderedType = StructType(dataCols ++ partitionCols) + val schemaParts = reOrderedType.json.grouped(threshold).toSeq + + var properties = Map( + "spark.sql.sources.provider" -> "hudi", + "spark.sql.sources.schema.numParts" -> schemaParts.size.toString + ) + schemaParts.zipWithIndex.foreach { case (part, index) => + properties += s"spark.sql.sources.schema.part.$index" -> part + } + // add partition columns + if (partitionSet.nonEmpty) { + properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString + partitionSet.zipWithIndex.foreach { case (partCol, index) => + properties += s"spark.sql.sources.schema.partCol.$index" -> partCol + } + } + var sqlPropertyText = ConfigUtils.configToString(properties) + sqlPropertyText = if (hoodieConfig.contains(HIVE_TABLE_PROPERTIES)) { + sqlPropertyText + "\n" + hoodieConfig.getString(HIVE_TABLE_PROPERTIES) + } else { + sqlPropertyText + } + hoodieConfig.setValue(HIVE_TABLE_PROPERTIES, sqlPropertyText) + hoodieConfig + } + + private def createSqlTableSerdeProperties(hoodieConfig: HoodieConfig, basePath: String): String = { + val pathProp = s"path=$basePath" + if (hoodieConfig.contains(HIVE_TABLE_SERDE_PROPERTIES)) { + pathProp + "\n" + hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES) + } else { + pathProp + } + } + private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path, schema: StructType): Boolean = { val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED_OPT_KEY).toBoolean @@ -464,6 +532,7 @@ object HoodieSparkSqlWriter { var syncClientToolClassSet = scala.collection.mutable.Set[String]() hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass) + val newHoodieConfig = addSqlTableProperties(spark.sessionState.conf, schema, hoodieConfig) // for backward compatibility if (hiveSyncEnabled) { metaSyncEnabled = true @@ -476,12 +545,12 @@ object HoodieSparkSqlWriter { val syncSuccess = impl.trim match { case "org.apache.hudi.hive.HiveSyncTool" => { log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL_OPT_KEY) + ")") - syncHive(basePath, fs, hoodieConfig, spark.sessionState.conf) + syncHive(basePath, fs, newHoodieConfig) true } case _ => { val properties = new Properties() - properties.putAll(hoodieConfig.getProps) + properties.putAll(newHoodieConfig.getProps) properties.put("basePath", basePath.toString) val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool] syncHoodie.syncHoodieTable() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 6f5f6e699359b..d37dac444cf39 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -37,7 +37,8 @@ import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUt import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{expr, lit} -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} @@ -537,6 +538,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { test("Test build sync config for spark sql") { initSparkContext("test build sync config") + val addSqlTablePropertiesMethod = + HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", + classOf[SQLConf], classOf[StructType], classOf[HoodieConfig]) + addSqlTablePropertiesMethod.setAccessible(true) + val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val basePath = "/tmp/hoodie_test" @@ -549,23 +555,49 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) + val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, + spark.sessionState.conf, structType, hoodieConfig) + .asInstanceOf[HoodieConfig] val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], - classOf[HoodieConfig], classOf[SQLConf]) + classOf[HoodieConfig]) buildSyncConfigMethod.setAccessible(true) val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, - new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] + new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] + assertTrue(hiveSyncConfig.skipROSuffix) assertTrue(hiveSyncConfig.createManagedTable) - assertTrue(hiveSyncConfig.syncAsSparkDataSourceTable) - assertResult(spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD))(hiveSyncConfig.sparkSchemaLengthThreshold) + assertResult("spark.sql.sources.provider=hudi\n" + + "spark.sql.sources.schema.partCol.0=partition\n" + + "spark.sql.sources.schema.numParts=1\n" + + "spark.sql.sources.schema.numPartCols=1\n" + + "spark.sql.sources.schema.part.0=" + + "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\"," + + "\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":" + + "\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties) + assertResult("path=/tmp/hoodie_test\n" + + "spark.query.type.key=hoodie.datasource.query.type\n" + + "spark.query.as.rt.key=snapshot\n" + + "spark.query.as.ro.key=read_optimized")(hiveSyncConfig.serdeProperties) } test("Test build sync config for skip Ro Suffix vals") { initSparkContext("test build sync config for skip Ro suffix vals") + val addSqlTablePropertiesMethod = + HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", + classOf[SQLConf], classOf[StructType], classOf[HoodieConfig]) + addSqlTablePropertiesMethod.setAccessible(true) + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val basePath = "/tmp/hoodie_test" val params = Map( "path" -> basePath, @@ -574,14 +606,18 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) + val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, + spark.sessionState.conf, structType, hoodieConfig) + .asInstanceOf[HoodieConfig] val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], - classOf[HoodieConfig], classOf[SQLConf]) + classOf[HoodieConfig]) buildSyncConfigMethod.setAccessible(true) val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, - new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] + new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] + assertFalse(hiveSyncConfig.skipROSuffix) } diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index 03e8b3e3701ea..fd63028951047 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -150,12 +150,6 @@ test - - org.apache.spark - spark-sql_${scala.binary.version} - test - - org.eclipse.jetty.aggregate diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 47f23864c01ca..09c3e7b354f2b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -110,12 +110,6 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive") public Integer batchSyncNum = 1000; - @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.") - public Boolean syncAsSparkDataSourceTable = true; - - @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.") - public int sparkSchemaLengthThreshold = 4000; - // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -137,8 +131,6 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.serdeProperties = cfg.serdeProperties; newConfig.createManagedTable = cfg.createManagedTable; newConfig.batchSyncNum = cfg.batchSyncNum; - newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable; - newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold; return newConfig; } @@ -168,8 +160,6 @@ public String toString() { + ", supportTimestamp=" + supportTimestamp + ", decodePartition=" + decodePartition + ", createManagedTable=" + createManagedTable - + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable - + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 88efabea8fe24..7264c8dffea9d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -20,13 +20,11 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hive.util.ConfigUtils; -import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.util.HiveSchemaUtil; @@ -39,20 +37,13 @@ import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.parquet.schema.OriginalType.UTF8; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; - /** * Tool to sync a hoodie HDFS table with a hive metastore table. Either use it as a api * HiveSyncTool.syncHoodieTable(HiveSyncConfig) or as a command line java -cp hoodie-hive-sync.jar HiveSyncTool [args] @@ -161,16 +152,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Get the parquet schema for this table looking at the latest commit MessageType schema = hoodieHiveClient.getDataSchema(); - - // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, - // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table - // by the data source way (which will use the HoodieBootstrapRelation). - // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. - if (hoodieHiveClient.isBootstrap() - && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ - && !readAsOptimized) { - cfg.syncAsSparkDataSourceTable = false; - } // Sync schema if needed syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); @@ -199,15 +180,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, */ private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) { - // Append spark table properties & serde properties - Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); - Map serdeProperties = ConfigUtils.toMap(cfg.serdeProperties); - if (cfg.syncAsSparkDataSourceTable) { - Map sparkTableProperties = getSparkTableProperties(cfg.sparkSchemaLengthThreshold, schema); - Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized); - tableProperties.putAll(sparkTableProperties); - serdeProperties.putAll(sparkSerdeProperties); - } // Check and sync schema if (!tableExists) { LOG.info("Hive table " + tableName + " is not found. Creating it"); @@ -224,11 +196,27 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat); String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); + Map serdeProperties = ConfigUtils.toMap(cfg.serdeProperties); + + // The serdeProperties is non-empty only for spark sync meta data currently. + if (!serdeProperties.isEmpty()) { + String queryTypeKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_TYPE_KEY); + String queryAsROKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RO_KEY); + String queryAsRTKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RT_KEY); + + if (queryTypeKey != null && queryAsROKey != null && queryAsRTKey != null) { + if (readAsOptimized) { // read optimized + serdeProperties.put(queryTypeKey, queryAsROKey); + } else { // read snapshot + serdeProperties.put(queryTypeKey, queryAsRTKey); + } + } + } // Custom serde will not work with ALTER TABLE REPLACE COLUMNS // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive // /ql/exec/DDLTask.java#L3488 hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, - outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties); + outputFormatClassName, serDeFormatClassName, serdeProperties, ConfigUtils.toMap(cfg.tableProperties)); } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); @@ -238,6 +226,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi hoodieHiveClient.updateTableDefinition(tableName, schema); // Sync the table properties if the schema has changed if (cfg.tableProperties != null) { + Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); hoodieHiveClient.updateTableProperties(tableName, tableProperties); LOG.info("Sync table properties for " + tableName + ", table properties is: " + cfg.tableProperties); } @@ -247,72 +236,6 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi } } - /** - * Get Spark Sql related table properties. This is used for spark datasource table. - * @param schema The schema to write to the table. - * @return A new parameters added the spark's table properties. - */ - private Map getSparkTableProperties(int schemaLengthThreshold, MessageType schema) { - // Convert the schema and partition info used by spark sql to hive table properties. - // The following code refers to the spark code in - // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala - GroupType originGroupType = schema.asGroupType(); - List partitionNames = cfg.partitionFields; - List partitionCols = new ArrayList<>(); - List dataCols = new ArrayList<>(); - Map column2Field = new HashMap<>(); - - for (Type field : originGroupType.getFields()) { - column2Field.put(field.getName(), field); - } - // Get partition columns and data columns. - for (String partitionName : partitionNames) { - // Default the unknown partition fields to be String. - // Keep the same logical with HiveSchemaUtil#getPartitionKeyType. - partitionCols.add(column2Field.getOrDefault(partitionName, - new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8))); - } - - for (Type field : originGroupType.getFields()) { - if (!partitionNames.contains(field.getName())) { - dataCols.add(field); - } - } - - List reOrderedFields = new ArrayList<>(); - reOrderedFields.addAll(dataCols); - reOrderedFields.addAll(partitionCols); - GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields); - - Map sparkProperties = new HashMap<>(); - sparkProperties.put("spark.sql.sources.provider", "hudi"); - // Split the schema string to multi-parts according the schemaLengthThreshold size. - String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType); - int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold; - sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart)); - // Add each part of schema string to sparkProperties - for (int i = 0; i < numSchemaPart; i++) { - int start = i * schemaLengthThreshold; - int end = Math.min(start + schemaLengthThreshold, schemaString.length()); - sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end)); - } - // Add partition columns - if (!partitionNames.isEmpty()) { - sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size())); - for (int i = 0; i < partitionNames.size(); i++) { - sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i)); - } - } - return sparkProperties; - } - - private Map getSparkSerdeProperties(boolean readAsOptimized) { - Map sparkSerdeProperties = new HashMap<>(); - sparkSerdeProperties.put("path", cfg.basePath); - sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized)); - return sparkSerdeProperties; - } - /** * Syncs the list of storage parititions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java index 94ebdaadd8ff3..b8745b6e30807 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java @@ -23,11 +23,12 @@ import org.apache.hudi.common.util.StringUtils; public class ConfigUtils { - /** - * Config stored in hive serde properties to tell query engine (spark/flink) to - * read the table as a read-optimized table when this config is true. - */ - public static final String IS_QUERY_AS_RO_TABLE = "hoodie.query.as.ro.table"; + + public static final String SPARK_QUERY_TYPE_KEY = "spark.query.type.key"; + + public static final String SPARK_QUERY_AS_RO_KEY = "spark.query.as.ro.key"; + + public static final String SPARK_QUERY_AS_RT_KEY = "spark.query.as.rt.key"; /** * Convert the key-value config to a map.The format of the config diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java deleted file mode 100644 index debc262b5518c..0000000000000 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.hive.util; - -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; - -import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; - -/** - * Convert the parquet schema to spark schema' json string. - * This code is refer to org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter - * in spark project. - */ -public class Parquet2SparkSchemaUtils { - - public static String convertToSparkSchemaJson(GroupType parquetSchema) { - String fieldsJsonString = parquetSchema.getFields().stream().map(field -> { - switch (field.getRepetition()) { - case OPTIONAL: - return "{\"name\":\"" + field.getName() + "\",\"type\":" + convertFieldType(field) - + ",\"nullable\":true,\"metadata\":{}}"; - case REQUIRED: - return "{\"name\":\"" + field.getName() + "\",\"type\":" + convertFieldType(field) - + ",\"nullable\":false,\"metadata\":{}}"; - case REPEATED: - String arrayType = arrayType(field, false); - return "{\"name\":\"" + field.getName() + "\",\"type\":" + arrayType - + ",\"nullable\":false,\"metadata\":{}}"; - default: - throw new UnsupportedOperationException("Unsupport convert " + field + " to spark sql type"); - } - }).reduce((a, b) -> a + "," + b).orElse(""); - return "{\"type\":\"struct\",\"fields\":[" + fieldsJsonString + "]}"; - } - - private static String convertFieldType(Type field) { - if (field instanceof PrimitiveType) { - return "\"" + convertPrimitiveType((PrimitiveType) field) + "\""; - } else { - assert field instanceof GroupType; - return convertGroupField((GroupType) field); - } - } - - private static String convertPrimitiveType(PrimitiveType field) { - PrimitiveType.PrimitiveTypeName typeName = field.getPrimitiveTypeName(); - OriginalType originalType = field.getOriginalType(); - - switch (typeName) { - case BOOLEAN: return "boolean"; - case FLOAT: return "float"; - case DOUBLE: return "double"; - case INT32: - if (originalType == null) { - return "integer"; - } - switch (originalType) { - case INT_8: return "byte"; - case INT_16: return "short"; - case INT_32: return "integer"; - case DATE: return "date"; - case DECIMAL: - return "decimal(" + field.getDecimalMetadata().getPrecision() + "," - + field.getDecimalMetadata().getScale() + ")"; - default: throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); - } - case INT64: - if (originalType == null) { - return "long"; - } - switch (originalType) { - case INT_64: return "long"; - case DECIMAL: - return "decimal(" + field.getDecimalMetadata().getPrecision() + "," - + field.getDecimalMetadata().getScale() + ")"; - case TIMESTAMP_MICROS: - case TIMESTAMP_MILLIS: - return "timestamp"; - default: - throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); - } - case INT96: return "timestamp"; - - case BINARY: - if (originalType == null) { - return "binary"; - } - switch (originalType) { - case UTF8: - case ENUM: - case JSON: - return "string"; - case BSON: return "binary"; - case DECIMAL: - return "decimal(" + field.getDecimalMetadata().getPrecision() + "," - + field.getDecimalMetadata().getScale() + ")"; - default: - throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); - } - - case FIXED_LEN_BYTE_ARRAY: - switch (originalType) { - case DECIMAL: - return "decimal(" + field.getDecimalMetadata().getPrecision() + "," - + field.getDecimalMetadata().getScale() + ")"; - default: - throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); - } - default: - throw new UnsupportedOperationException("Unsupport convert " + typeName + " to spark sql type"); - } - } - - private static String convertGroupField(GroupType field) { - if (field.getOriginalType() == null) { - return convertToSparkSchemaJson(field); - } - switch (field.getOriginalType()) { - case LIST: - ValidationUtils.checkArgument(field.getFieldCount() == 1, "Illegal List type: " + field); - Type repeatedType = field.getType(0); - if (isElementType(repeatedType, field.getName())) { - return arrayType(repeatedType, false); - } else { - Type elementType = repeatedType.asGroupType().getType(0); - boolean optional = elementType.isRepetition(OPTIONAL); - return arrayType(elementType, optional); - } - case MAP: - case MAP_KEY_VALUE: - GroupType keyValueType = field.getType(0).asGroupType(); - Type keyType = keyValueType.getType(0); - Type valueType = keyValueType.getType(1); - boolean valueOptional = valueType.isRepetition(OPTIONAL); - return "{\"type\":\"map\", \"keyType\":" + convertFieldType(keyType) - + ",\"valueType\":" + convertFieldType(valueType) - + ",\"valueContainsNull\":" + valueOptional + "}"; - default: - throw new UnsupportedOperationException("Unsupport convert " + field + " to spark sql type"); - } - } - - private static String arrayType(Type elementType, boolean containsNull) { - return "{\"type\":\"array\", \"elementType\":" + convertFieldType(elementType) + ",\"containsNull\":" + containsNull + "}"; - } - - private static boolean isElementType(Type repeatedType, String parentName) { - return repeatedType.isPrimitive() || repeatedType.asGroupType().getFieldCount() > 1 - || repeatedType.getName().equals("array") || repeatedType.getName().equals(parentName + "_tuple"); - } -} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 3494e4451fbab..c4125337ea9d8 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; + import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; @@ -69,11 +70,6 @@ private static Iterable useJdbcAndSchemaFromCommitMetadataAndManagedTa return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); } - // (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource) - private static Iterable syncDataSourceTableParams() { - return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); - } - @BeforeEach public void setUp() throws Exception { HiveTestUtil.setUp(); @@ -161,15 +157,17 @@ public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) } @ParameterizedTest - @MethodSource({"syncDataSourceTableParams"}) + @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) public void testSyncCOWTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata, - boolean syncAsDataSourceTable) throws Exception { + boolean useSchemaFromCommitMetadata) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); + put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); + put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); + put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); } }; @@ -179,7 +177,6 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, put("tp_1", "p1"); } }; - hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable; hiveSyncConfig.useJdbc = useJdbc; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); @@ -198,12 +195,9 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, String tblPropertiesWithoutDdlTime = String.join("\n", results.subList(0, results.size() - 1)); - - String sparkTableProperties = getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata); assertEquals( "EXTERNAL\tTRUE\n" + "last_commit_time_sync\t100\n" - + sparkTableProperties + "tp_0\tp0\n" + "tp_1\tp1", tblPropertiesWithoutDdlTime); assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); @@ -214,54 +208,21 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, hiveDriver.getResults(results); String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); - if (syncAsDataSourceTable) { - assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='false'")); - } - } - - private String getSparkTableProperties(boolean syncAsDataSourceTable, boolean useSchemaFromCommitMetadata) { - if (syncAsDataSourceTable) { - if (useSchemaFromCommitMetadata) { - return "spark.sql.sources.provider\thudi\n" - + "spark.sql.sources.schema.numPartCols\t1\n" - + "spark.sql.sources.schema.numParts\t1\n" - + "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":" - + "[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," - + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," - + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," - + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," - + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," - + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," - + "{\"name\":\"favorite_number\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}," - + "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," - + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" - + "spark.sql.sources.schema.partCol.0\tdatestr\n"; - } else { - return "spark.sql.sources.provider\thudi\n" - + "spark.sql.sources.schema.numPartCols\t1\n" - + "spark.sql.sources.schema.numParts\t1\n" - + "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":" - + "\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_number\",\"type\":\"integer\"," - + "\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false," - + "\"metadata\":{}}]}\n" - + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" - + "spark.sql.sources.schema.partCol.0\tdatestr\n"; - } - } else { - return ""; - } + assertTrue(ddl.contains("'hoodie.datasource.query.type'='snapshot'")); } @ParameterizedTest - @MethodSource({"syncDataSourceTableParams"}) + @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) public void testSyncMORTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata, - boolean syncAsDataSourceTable) throws Exception { + boolean useSchemaFromCommitMetadata) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); + put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); + put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); + put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); } }; @@ -271,7 +232,6 @@ public void testSyncMORTableWithProperties(boolean useJdbc, put("tp_1", "p1"); } }; - hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable; hiveSyncConfig.useJdbc = useJdbc; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); @@ -287,15 +247,14 @@ public void testSyncMORTableWithProperties(boolean useJdbc, String rtTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; String[] tableNames = new String[] {roTableName, rtTableName}; - String[] readAsOptimizedResults = new String[] {"true", "false"}; + String[] expectQueryTypes = new String[] {"read_optimized", "snapshot"}; SessionState.start(HiveTestUtil.getHiveConf()); Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf()); - String sparkTableProperties = getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata); for (int i = 0;i < 2; i++) { String dbTableName = hiveSyncConfig.databaseName + "." + tableNames[i]; - String readAsOptimized = readAsOptimizedResults[i]; + String expectQueryType = expectQueryTypes[i]; hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName); List results = new ArrayList<>(); @@ -306,7 +265,6 @@ public void testSyncMORTableWithProperties(boolean useJdbc, assertEquals( "EXTERNAL\tTRUE\n" + "last_commit_time_sync\t101\n" - + sparkTableProperties + "tp_0\tp0\n" + "tp_1\tp1", tblPropertiesWithoutDdlTime); assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); @@ -317,10 +275,8 @@ public void testSyncMORTableWithProperties(boolean useJdbc, hiveDriver.getResults(results); String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); + assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'")); assertTrue(ddl.toLowerCase().contains("create external table")); - if (syncAsDataSourceTable) { - assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='" + readAsOptimized + "'")); - } } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java deleted file mode 100644 index 15575c4607b27..0000000000000 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.hive; - -import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; -import org.apache.spark.sql.execution.SparkSqlParser; -import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter; -import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.sql.types.ArrayType; -import org.apache.spark.sql.types.MapType; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.IntegerType$; -import org.apache.spark.sql.types.StringType$; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class TestParquet2SparkSchemaUtils { - private final SparkToParquetSchemaConverter spark2ParquetConverter = - new SparkToParquetSchemaConverter( - (Boolean) SQLConf.PARQUET_WRITE_LEGACY_FORMAT().defaultValue().get(), - SQLConf.ParquetOutputTimestampType$.MODULE$.INT96()); - private final SparkSqlParser parser = new SparkSqlParser(new SQLConf()); - - @Test - public void testConvertPrimitiveType() { - StructType sparkSchema = parser.parseTableSchema( - "f0 int, f1 string, f3 bigint," - + " f4 decimal(5,2), f5 timestamp, f6 date," - + " f7 short, f8 float, f9 double, f10 byte," - + " f11 tinyint, f12 smallint, f13 binary, f14 boolean"); - - String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchema).asGroupType()); - StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson); - assertEquals(sparkSchema.json(), convertedSparkSchema.json()); - // Test type with nullable - StructField field0 = new StructField("f0", StringType$.MODULE$, false, Metadata.empty()); - StructField field1 = new StructField("f1", StringType$.MODULE$, true, Metadata.empty()); - StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1}); - String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType()); - StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson); - assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json()); - } - - @Test - public void testConvertComplexType() { - StructType sparkSchema = parser.parseTableSchema( - "f0 int, f1 map, f2 array" - + ",f3 map, bigint>, f4 array>" - + ",f5 struct"); - String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchema).asGroupType()); - StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson); - assertEquals(sparkSchema.json(), convertedSparkSchema.json()); - // Test complex type with nullable - StructField field0 = new StructField("f0", new ArrayType(StringType$.MODULE$, true), false, Metadata.empty()); - StructField field1 = new StructField("f1", new MapType(StringType$.MODULE$, IntegerType$.MODULE$, true), false, Metadata.empty()); - StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1}); - String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson( - spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType()); - StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson); - assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json()); - } -} diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 1107d744e68b0..90f6017f1c060 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -108,10 +108,6 @@ public FileSystem getFs() { return fs; } - public boolean isBootstrap() { - return metaClient.getTableConfig().getBootstrapBasePath().isPresent(); - } - public void closeQuietly(ResultSet resultSet, Statement stmt) { try { if (stmt != null) {