diff --git a/docker/demo/hive-table-check.commands b/docker/demo/hive-table-check.commands index 8cdf033a79565..1102ca53bbbd0 100644 --- a/docker/demo/hive-table-check.commands +++ b/docker/demo/hive-table-check.commands @@ -22,6 +22,13 @@ show partitions stock_ticks_cow; show partitions stock_ticks_mor_ro; show partitions stock_ticks_mor_rt; +show create table stock_ticks_cow; +show create table stock_ticks_mor_ro; +show create table stock_ticks_mor_rt; +show create table stock_ticks_cow_bs; +show create table stock_ticks_mor_bs_ro; +show create table stock_ticks_mor_bs_rt; + !quit diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index a7fbf66f94e16..22d30899c7895 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; @@ -49,11 +50,13 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -137,16 +140,14 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream> groupLogsByBaseFile(Configuration conf, List fileStatuses) { - Map> partitionsToParquetSplits = - fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent())); + public static List, List>> groupLogsByBaseFile(Configuration conf, List partitionPaths) { + Set partitionSet = new HashSet<>(partitionPaths); // TODO(vc): Should we handle also non-hoodie splits here? - Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet()); + Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); - // for all unique split parents, obtain all delta files based on delta commit timeline, - // grouped on file id - Map> resultMap = new HashMap<>(); - partitionsToParquetSplits.keySet().forEach(partitionPath -> { + // Get all the base file and it's log files pairs in required partition paths. + List, List>> baseAndLogsList = new ArrayList<>(); + partitionSet.forEach(partitionPath -> { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); @@ -161,28 +162,18 @@ public static Map> groupLogsByBaseFile(Configuratio .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) .orElse(Stream.empty()); - // subgroup splits again by file id & match with log files. - Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() - .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getFileStatus().getPath().getName()))); latestFileSlices.forEach(fileSlice -> { - List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); - dataFileSplits.forEach(split -> { - try { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); - resultMap.put(split, logFilePaths); - } catch (Exception e) { - throw new HoodieException("Error creating hoodie real time split ", e); - } - }); + baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths)); }); } catch (Exception e) { throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e); } }); - return resultMap; + return baseAndLogsList; } - + /** * Add a field to the existing fields projected. diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index 0fa9b0ffa6206..ee6eea95cbf40 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -241,6 +241,10 @@ private void testHiveAfterFirstBatch() throws Exception { assertStdOutContains(stdOutErrPair, "| partition |\n+----------------+\n| dt=2018-08-31 |\n+----------------+\n", 3); + // There should have 5 data source tables except stock_ticks_mor_bs_rt. + // After [HUDI-2071] has solved, we can inc the number 5 to 6. + assertStdOutContains(stdOutErrPair, "'spark.sql.sources.provider'='hudi'", 5); + stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS); assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n" + "| GOOG | 2018-08-31 10:29:00 |\n", 6); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 57cd48edc62ab..75302f051e5be 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -354,7 +354,6 @@ object DataSourceWriteOptions { // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // unexpected issues with config getting reset - val HIVE_SYNC_ENABLED_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.enable") .defaultValue("false") @@ -441,16 +440,6 @@ object DataSourceWriteOptions { .withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " + "Disabled by default for backward compatibility.") - val HIVE_TABLE_PROPERTIES: ConfigProperty[String] = ConfigProperty - .key("hoodie.datasource.hive_sync.table_properties") - .noDefaultValue() - .withDocumentation("") - - val HIVE_TABLE_SERDE_PROPERTIES: ConfigProperty[String] = ConfigProperty - .key("hoodie.datasource.hive_sync.serde_properties") - .noDefaultValue() - .withDocumentation("") - val HIVE_SYNC_AS_DATA_SOURCE_TABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.sync_as_datasource") .defaultValue("true") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 1097c420971ff..146971e18661c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter +import org.apache.hudi.hive.util.ConfigUtils import org.apache.log4j.LogManager import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} @@ -105,8 +106,14 @@ class DefaultSource extends RelationProvider val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val tableType = metaClient.getTableType - val queryType = parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue) - log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType") + + // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool, + // or else use query type from QUERY_TYPE_OPT_KEY. + val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) + .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL) + .getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue())) + + log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") (tableType, queryType, isBootstrappedTable) match { case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index ca2bf72cfbcb7..3aa41eb8efb6f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -106,6 +106,12 @@ case class HoodieFileIndex( } } + private lazy val metadataConfig = { + val properties = new Properties() + properties.putAll(options.asJava) + HoodieMetadataConfig.newBuilder.fromProperties(properties).build() + } + @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _ @transient @volatile private var cachedFileSize: Long = 0L @@ -195,8 +201,8 @@ case class HoodieFileIndex( * @param predicates The filter condition. * @return The Pruned partition paths. */ - private def prunePartition(partitionPaths: Seq[PartitionRowPath], - predicates: Seq[Expression]): Seq[PartitionRowPath] = { + def prunePartition(partitionPaths: Seq[PartitionRowPath], + predicates: Seq[Expression]): Seq[PartitionRowPath] = { val partitionColumnNames = partitionSchema.fields.map(_.name).toSet val partitionPruningPredicates = predicates.filter { @@ -222,26 +228,13 @@ case class HoodieFileIndex( } } - /** - * Load all partition paths and it's files under the query table path. - */ - private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { + def getAllQueryPartitionPaths: Seq[PartitionRowPath] = { val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - val properties = new Properties() - properties.putAll(options.asJava) - val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build() - val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) // Load all the partition path from the basePath, and filter by the query partition path. // TODO load files from the queryPartitionPath directly. val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala .filter(_.startsWith(queryPartitionPath)) - - val writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath).withProperties(properties).build() - val maxListParallelism = writeConfig.getFileListingParallelism - - val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) val partitionSchema = _partitionSchemaFromProperties val timeZoneId = CaseInsensitiveMap(options) .get(DateTimeUtils.TIMEZONE_OPTION) @@ -250,7 +243,7 @@ case class HoodieFileIndex( val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark .sessionState.conf) // Convert partition path to PartitionRowPath - val partitionRowPaths = partitionPaths.map { partitionPath => + partitionPaths.map { partitionPath => val partitionRow = if (partitionSchema.fields.length == 0) { // This is a non-partitioned table InternalRow.empty @@ -308,7 +301,20 @@ case class HoodieFileIndex( } PartitionRowPath(partitionRow, partitionPath) } + } + + /** + * Load all partition paths and it's files under the query table path. + */ + private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { + val properties = new Properties() + properties.putAll(options.asJava) + val writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath).withProperties(properties).build() + val maxListParallelism = writeConfig.getFileListingParallelism + val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) + val partitionRowPaths = getAllQueryPartitionPaths // List files in all of the partition path. val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index aa835ff52b137..6ccc50b600ff6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -19,7 +19,6 @@ package org.apache.hudi import java.util import java.util.Properties - import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -36,24 +35,21 @@ import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException -import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.internal.DataSourceInternalWriterHelper -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager import org.apache.spark.SPARK_VERSION import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.hudi.HoodieSqlUtils -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} object HoodieSparkSqlWriter { @@ -397,15 +393,15 @@ object HoodieSparkSqlWriter { } } - private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig): Boolean = { - val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig) + private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig, sqlConf: SQLConf): Boolean = { + val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf) val hiveConf: HiveConf = new HiveConf() hiveConf.addResource(fs.getConf) new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable() true } - private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig): HiveSyncConfig = { + private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig, sqlConf: SQLConf): HiveSyncConfig = { val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig() hiveSyncConfig.basePath = basePath.toString hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT_OPT_KEY) @@ -430,76 +426,14 @@ object HoodieSparkSqlWriter { hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt - val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean - if (syncAsDtaSourceTable) { - hiveSyncConfig.tableProperties = hoodieConfig.getStringOrDefault(HIVE_TABLE_PROPERTIES, null) - val serdePropText = createSqlTableSerdeProperties(hoodieConfig, basePath.toString) - val serdeProp = ConfigUtils.toMap(serdePropText) - serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key) - serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - - hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp) - } + hiveSyncConfig.saveAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean + hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD) hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE) hiveSyncConfig } - /** - * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES. - * @param sqlConf The spark sql conf. - * @param schema The schema to write to the table. - * @param hoodieConfig The HoodieConfig contains origin parameters. - * @return A new parameters added the HIVE_TABLE_PROPERTIES property. - */ - private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType, - hoodieConfig: HoodieConfig): HoodieConfig = { - // Convert the schema and partition info used by spark sql to hive table properties. - // The following code refers to the spark code in - // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala - - // Sync schema with meta fields - val schemaWithMetaFields = HoodieSqlUtils.addMetaFields(schema) - val partitionSet = hoodieConfig.getString(HIVE_PARTITION_FIELDS_OPT_KEY) - .split(",").map(_.trim).filter(!_.isEmpty).toSet - val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD) - - val (partitionCols, dataCols) = schemaWithMetaFields.partition(c => partitionSet.contains(c.name)) - val reOrderedType = StructType(dataCols ++ partitionCols) - val schemaParts = reOrderedType.json.grouped(threshold).toSeq - - var properties = Map( - "spark.sql.sources.provider" -> "hudi", - "spark.sql.sources.schema.numParts" -> schemaParts.size.toString - ) - schemaParts.zipWithIndex.foreach { case (part, index) => - properties += s"spark.sql.sources.schema.part.$index" -> part - } - // add partition columns - if (partitionSet.nonEmpty) { - properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString - partitionSet.zipWithIndex.foreach { case (partCol, index) => - properties += s"spark.sql.sources.schema.partCol.$index" -> partCol - } - } - var sqlPropertyText = ConfigUtils.configToString(properties) - sqlPropertyText = if (hoodieConfig.contains(HIVE_TABLE_PROPERTIES)) { - sqlPropertyText + "\n" + hoodieConfig.getString(HIVE_TABLE_PROPERTIES) - } else { - sqlPropertyText - } - hoodieConfig.setValue(HIVE_TABLE_PROPERTIES, sqlPropertyText) - hoodieConfig - } - private def createSqlTableSerdeProperties(hoodieConfig: HoodieConfig, basePath: String): String = { - val pathProp = s"path=$basePath" - if (hoodieConfig.contains(HIVE_TABLE_SERDE_PROPERTIES)) { - pathProp + "\n" + hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES) - } else { - pathProp - } - } + private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path, schema: StructType): Boolean = { @@ -508,7 +442,6 @@ object HoodieSparkSqlWriter { var syncClientToolClassSet = scala.collection.mutable.Set[String]() hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass) - val newHoodieConfig = addSqlTableProperties(spark.sessionState.conf, schema, hoodieConfig) // for backward compatibility if (hiveSyncEnabled) { metaSyncEnabled = true @@ -521,12 +454,12 @@ object HoodieSparkSqlWriter { val syncSuccess = impl.trim match { case "org.apache.hudi.hive.HiveSyncTool" => { log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL_OPT_KEY) + ")") - syncHive(basePath, fs, newHoodieConfig) + syncHive(basePath, fs, hoodieConfig, spark.sessionState.conf) true } case _ => { val properties = new Properties() - properties.putAll(newHoodieConfig.getProps) + properties.putAll(hoodieConfig.getProps) properties.put("basePath", basePath.toString) val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool] syncHoodie.syncHoodieTable() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index f20bb4d250a1d..381df113246bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -19,7 +19,6 @@ package org.apache.hudi import org.apache.avro.Schema -import org.apache.hudi.common.model.HoodieBaseFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils @@ -137,12 +136,15 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, } def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = { - - val fileStatuses = if (globPaths.isDefined) { + // Get all partition paths + val partitionPaths = if (globPaths.isDefined) { // Load files from the global paths if it has defined to be compatible with the original mode val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get) - inMemoryFileIndex.allFiles() - } else { // Load files by the HoodieFileIndex. + val fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants, inMemoryFileIndex.allFiles().toArray) + fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent) + } else { // Load partition path by the HoodieFileIndex. val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) @@ -152,34 +154,35 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val partitionFilterExpression = HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema) - // if convert success to catalyst expression, use the partition prune - if (partitionFilterExpression.isDefined) { - hoodieFileIndex.listFiles(Seq(partitionFilterExpression.get), Seq.empty).flatMap(_.files) - } else { - hoodieFileIndex.allFiles - } + val allPartitionPaths = hoodieFileIndex.getAllQueryPartitionPaths + // If convert success to catalyst expression, use the partition prune + hoodieFileIndex.prunePartition(allPartitionPaths, partitionFilterExpression.map(Seq(_)).getOrElse(Seq.empty)) + .map(_.fullPartitionPath(metaClient.getBasePath)) } - if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list. + if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list. List.empty[HoodieMergeOnReadFileSplit] } else { - val fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants, fileStatuses.toArray) - val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList - - if (!fsView.getLastInstant.isPresent) { // Return empty list if the table has no commit + val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant() + if (!lastInstant.isPresent) { // Return empty list if the table has no commit List.empty } else { - val latestCommit = fsView.getLastInstant.get().getTimestamp - val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala - val fileSplits = fileGroup.map(kv => { - val baseFile = kv._1 - val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) - val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath) - - val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, baseFile.getFileLen) - HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit, + val latestCommit = lastInstant.get().getTimestamp + val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala + val fileSplits = baseAndLogsList.map(kv => { + val baseFile = kv.getLeft + val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList) + + val baseDataPath = if (baseFile.isPresent) { + Some(PartitionedFile( + InternalRow.empty, + MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath), + 0, baseFile.get.getFileLen) + ) + } else { + None + } + HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList fileSplits diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index ebf23242a5279..992c59312c10a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -18,6 +18,12 @@ package org.apache.hudi.testutils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.FileIOUtils; import org.apache.avro.Schema; @@ -89,4 +95,27 @@ public static List generateRandomRowsEvolvedSchema(int count) { } return toReturn; } + + /** + * Test if there is only log files exists in the table. + */ + public static boolean isLogFileOnly(String basePath) throws IOException { + Configuration conf = new Configuration(); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(conf).setBasePath(basePath) + .build(); + String baseDataFormat = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + Path path = new Path(basePath); + FileSystem fs = path.getFileSystem(conf); + RemoteIterator files = fs.listFiles(path, true); + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + if (file.isFile()) { + if (file.getPath().toString().endsWith(baseDataFormat)) { + return false; + } + } + } + return true; + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index a06eeb188d75a..cca73fafda682 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -36,8 +36,7 @@ import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUt import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{expr, lit} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} @@ -530,11 +529,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { test("Test build sync config for spark sql") { initSparkContext("test build sync config") - val addSqlTablePropertiesMethod = - HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", - classOf[SQLConf], classOf[StructType], classOf[HoodieConfig]) - addSqlTablePropertiesMethod.setAccessible(true) - val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val basePath = "/tmp/hoodie_test" @@ -547,49 +541,23 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) - val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, - spark.sessionState.conf, structType, hoodieConfig) - .asInstanceOf[HoodieConfig] val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], - classOf[HoodieConfig]) + classOf[HoodieConfig], classOf[SQLConf]) buildSyncConfigMethod.setAccessible(true) val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, - new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] - + new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] assertTrue(hiveSyncConfig.skipROSuffix) assertTrue(hiveSyncConfig.createManagedTable) - assertResult("spark.sql.sources.provider=hudi\n" + - "spark.sql.sources.schema.partCol.0=partition\n" + - "spark.sql.sources.schema.numParts=1\n" + - "spark.sql.sources.schema.numPartCols=1\n" + - "spark.sql.sources.schema.part.0=" + - "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\"," + - "\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":" + - "\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + - "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," + - "{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties) - assertResult("path=/tmp/hoodie_test\n" + - "spark.query.type.key=hoodie.datasource.query.type\n" + - "spark.query.as.rt.key=snapshot\n" + - "spark.query.as.ro.key=read_optimized")(hiveSyncConfig.serdeProperties) + assertTrue(hiveSyncConfig.saveAsSparkDataSourceTable) + assertResult(spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD))(hiveSyncConfig.sparkSchemaLengthThreshold) } test("Test build sync config for skip Ro Suffix vals") { initSparkContext("test build sync config for skip Ro suffix vals") - val addSqlTablePropertiesMethod = - HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", - classOf[SQLConf], classOf[StructType], classOf[HoodieConfig]) - addSqlTablePropertiesMethod.setAccessible(true) - val schema = DataSourceTestUtils.getStructTypeExampleSchema - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val basePath = "/tmp/hoodie_test" val params = Map( "path" -> basePath, @@ -598,18 +566,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) - val newHoodieConfig = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, - spark.sessionState.conf, structType, hoodieConfig) - .asInstanceOf[HoodieConfig] val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], - classOf[HoodieConfig]) + classOf[HoodieConfig], classOf[SQLConf]) buildSyncConfigMethod.setAccessible(true) val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, - new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] - + new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] assertFalse(hiveSyncConfig.skipROSuffix) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 5574c01fb14a9..1028eefd250eb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,20 +17,24 @@ package org.apache.hudi.functional +import org.apache.hadoop.fs.Path + import scala.collection.JavaConverters._ import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.DefaultHoodieRecordPayload +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator -import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} @@ -677,4 +681,23 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(partitionCounts("2021/03/03"), count7) } + + @Test + def testReadLogOnlyMergeOnReadTable(): Unit = { + initMetaClient(HoodieTableType.MERGE_ON_READ) + val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20) + val inputDF = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + // Use InMemoryIndex to generate log only mor table. + .option(HoodieIndexConfig.INDEX_TYPE_PROP.key, IndexType.INMEMORY.toString) + .mode(SaveMode.Overwrite) + .save(basePath) + // There should no base file in the file list. + assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) + // Test read log only mor table. + assertEquals(20, spark.read.format("hudi").load(basePath).count()) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala new file mode 100644 index 0000000000000..79d2730b308a9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.testutils.DataSourceTestUtils + +class TestMergeIntoTable2 extends TestHoodieSqlBase { + + test("Test Query Log Only MOR Table") { + withTempDir { tmp => + // Create table with INMEMORY index to generate log only mor table. + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, 'a1' as name, 11 as price, 1001 as ts + | ) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1001), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, 'a4' as name, 11 as price, 1000 as ts + | ) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1001), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4", 11.0, 1000) + ) + } + } +} diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index fd63028951047..1af35e6d7e00c 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -53,6 +53,12 @@ ${project.version} + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + log4j diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 09c3e7b354f2b..95d953bad697b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -110,6 +110,12 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive") public Integer batchSyncNum = 1000; + @Parameter(names = {"--sparkDataSource"}, description = "Whether save this table as spark data source table.") + public Boolean saveAsSparkDataSourceTable = true; + + @Parameter(names = {"--spark-schemaLengthThreshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.") + public int sparkSchemaLengthThreshold = 4000; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -131,6 +137,8 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.serdeProperties = cfg.serdeProperties; newConfig.createManagedTable = cfg.createManagedTable; newConfig.batchSyncNum = cfg.batchSyncNum; + newConfig.saveAsSparkDataSourceTable = cfg.saveAsSparkDataSourceTable; + newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold; return newConfig; } @@ -160,6 +168,8 @@ public String toString() { + ", supportTimestamp=" + supportTimestamp + ", decodePartition=" + decodePartition + ", createManagedTable=" + createManagedTable + + ", saveAsSparkDataSourceTable=" + saveAsSparkDataSourceTable + + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 7264c8dffea9d..1d469f5f38279 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; @@ -38,8 +39,14 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -152,6 +159,16 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Get the parquet schema for this table looking at the latest commit MessageType schema = hoodieHiveClient.getDataSchema(); + + // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, + // so we disable the saveAsSparkDataSourceTable here to avoid read such kind table + // by the data source way (which will use the HoodieBootstrapRelation). + // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. + if (hoodieHiveClient.isBootstrap() + && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ + && !readAsOptimized) { + cfg.saveAsSparkDataSourceTable = false; + } // Sync schema if needed syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); @@ -180,6 +197,15 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, */ private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) { + // Append spark table properties & serde properties + Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); + Map serdeProperties = ConfigUtils.toMap(cfg.serdeProperties); + if (cfg.saveAsSparkDataSourceTable) { + Map sparkTableProperties = getSparkTableProperties(cfg.sparkSchemaLengthThreshold, schema); + Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized); + tableProperties.putAll(sparkTableProperties); + serdeProperties.putAll(sparkSerdeProperties); + } // Check and sync schema if (!tableExists) { LOG.info("Hive table " + tableName + " is not found. Creating it"); @@ -196,27 +222,11 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat); String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); - Map serdeProperties = ConfigUtils.toMap(cfg.serdeProperties); - - // The serdeProperties is non-empty only for spark sync meta data currently. - if (!serdeProperties.isEmpty()) { - String queryTypeKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_TYPE_KEY); - String queryAsROKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RO_KEY); - String queryAsRTKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RT_KEY); - - if (queryTypeKey != null && queryAsROKey != null && queryAsRTKey != null) { - if (readAsOptimized) { // read optimized - serdeProperties.put(queryTypeKey, queryAsROKey); - } else { // read snapshot - serdeProperties.put(queryTypeKey, queryAsRTKey); - } - } - } // Custom serde will not work with ALTER TABLE REPLACE COLUMNS // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive // /ql/exec/DDLTask.java#L3488 hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, - outputFormatClassName, serDeFormatClassName, serdeProperties, ConfigUtils.toMap(cfg.tableProperties)); + outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties); } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); @@ -226,7 +236,6 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi hoodieHiveClient.updateTableDefinition(tableName, schema); // Sync the table properties if the schema has changed if (cfg.tableProperties != null) { - Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); hoodieHiveClient.updateTableProperties(tableName, tableProperties); LOG.info("Sync table properties for " + tableName + ", table properties is: " + cfg.tableProperties); } @@ -236,6 +245,70 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi } } + /** + * Get Spark Sql related table properties. This is used for spark datasource table. + * @param schema The schema to write to the table. + * @return A new parameters added the spark's table properties. + */ + private Map getSparkTableProperties(int schemaLengthThreshold, MessageType schema) { + // Convert the schema and partition info used by spark sql to hive table properties. + // The following code refers to the spark code in + // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala + + StructType sparkSchema = + new ParquetToSparkSchemaConverter(false, true).convert(schema); + List partitionNames = cfg.partitionFields; + List partitionCols = new ArrayList<>(); + List dataCols = new ArrayList<>(); + Map column2Field = new HashMap<>(); + for (StructField field : sparkSchema.fields()) { + column2Field.put(field.name(), field); + } + for (String partitionName : partitionNames) { + // Default the unknown partition fields to be String. + // Keep the same logical with HiveSchemaUtil#getPartitionKeyType. + partitionCols.add(column2Field.getOrDefault(partitionName, + new StructField(partitionName, StringType$.MODULE$, false, Metadata.empty()))); + } + for (StructField field : sparkSchema.fields()) { + if (!partitionNames.contains(field.name())) { + dataCols.add(field); + } + } + List reOrderedFields = new ArrayList<>(); + reOrderedFields.addAll(dataCols); + reOrderedFields.addAll(partitionCols); + StructType reOrderedType = new StructType(reOrderedFields.toArray(new StructField[]{})); + + Map sparkProperties = new HashMap<>(); + sparkProperties.put("spark.sql.sources.provider", "hudi"); + // Split the schema string to multi-parts according the schemaLengthThreshold size. + String schemaString = reOrderedType.json(); + int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold; + sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart)); + // Add each part of schema string to sparkProperties + for (int i = 0; i < numSchemaPart; i++) { + int start = i * schemaLengthThreshold; + int end = Math.min(start + schemaLengthThreshold, schemaString.length()); + sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end)); + } + // Add partition columns + if (!partitionNames.isEmpty()) { + sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size())); + for (int i = 0; i < partitionNames.size(); i++) { + sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i)); + } + } + return sparkProperties; + } + + private Map getSparkSerdeProperties(boolean readAsOptimized) { + Map sparkSerdeProperties = new HashMap<>(); + sparkSerdeProperties.put("path", cfg.basePath); + sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized)); + return sparkSerdeProperties; + } + /** * Syncs the list of storage parititions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java index b8745b6e30807..94ebdaadd8ff3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java @@ -23,12 +23,11 @@ import org.apache.hudi.common.util.StringUtils; public class ConfigUtils { - - public static final String SPARK_QUERY_TYPE_KEY = "spark.query.type.key"; - - public static final String SPARK_QUERY_AS_RO_KEY = "spark.query.as.ro.key"; - - public static final String SPARK_QUERY_AS_RT_KEY = "spark.query.as.rt.key"; + /** + * Config stored in hive serde properties to tell query engine (spark/flink) to + * read the table as a read-optimized table when this config is true. + */ + public static final String IS_QUERY_AS_RO_TABLE = "hoodie.query.as.ro.table"; /** * Convert the key-value config to a map.The format of the config diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index c4125337ea9d8..163113b2f4eb4 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -42,7 +42,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; - import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; @@ -70,6 +69,10 @@ private static Iterable useJdbcAndSchemaFromCommitMetadataAndManagedTa return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); } + private static Iterable useJdbcAndSchemaFromCommitMetadataAndSaveAsDataSource() { + return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); + } + @BeforeEach public void setUp() throws Exception { HiveTestUtil.setUp(); @@ -157,17 +160,15 @@ public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) } @ParameterizedTest - @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) + @MethodSource({"useJdbcAndSchemaFromCommitMetadataAndSaveAsDataSource"}) public void testSyncCOWTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata) throws Exception { + boolean useSchemaFromCommitMetadata, + boolean saveAsDataSourceTable) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); - put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); - put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); - put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); } }; @@ -177,6 +178,7 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, put("tp_1", "p1"); } }; + hiveSyncConfig.saveAsSparkDataSourceTable = saveAsDataSourceTable; hiveSyncConfig.useJdbc = useJdbc; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); @@ -195,9 +197,12 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, String tblPropertiesWithoutDdlTime = String.join("\n", results.subList(0, results.size() - 1)); + + String sparkTableProperties = getSparkTableProperties(saveAsDataSourceTable, useSchemaFromCommitMetadata); assertEquals( "EXTERNAL\tTRUE\n" + "last_commit_time_sync\t100\n" + + sparkTableProperties + "tp_0\tp0\n" + "tp_1\tp1", tblPropertiesWithoutDdlTime); assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); @@ -208,21 +213,54 @@ public void testSyncCOWTableWithProperties(boolean useJdbc, hiveDriver.getResults(results); String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); - assertTrue(ddl.contains("'hoodie.datasource.query.type'='snapshot'")); + if (saveAsDataSourceTable) { + assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='false'")); + } + } + + private String getSparkTableProperties(boolean saveAsDataSourceTable, boolean useSchemaFromCommitMetadata) { + if (saveAsDataSourceTable) { + if (useSchemaFromCommitMetadata) { + return "spark.sql.sources.provider\thudi\n" + + "spark.sql.sources.schema.numPartCols\t1\n" + + "spark.sql.sources.schema.numParts\t1\n" + + "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":" + + "[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"favorite_number\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" + + "spark.sql.sources.schema.partCol.0\tdatestr\n"; + } else { + return "spark.sql.sources.provider\thudi\n" + + "spark.sql.sources.schema.numPartCols\t1\n" + + "spark.sql.sources.schema.numParts\t1\n" + + "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":" + + "\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_number\",\"type\":\"integer\"," + + "\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false," + + "\"metadata\":{}}]}\n" + + "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n" + + "spark.sql.sources.schema.partCol.0\tdatestr\n"; + } + } else { + return ""; + } } @ParameterizedTest - @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) + @MethodSource({"useJdbcAndSchemaFromCommitMetadataAndSaveAsDataSource"}) public void testSyncMORTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata) throws Exception { + boolean useSchemaFromCommitMetadata, + boolean saveAsDataSourceTable) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); - put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); - put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); - put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); } }; @@ -232,6 +270,7 @@ public void testSyncMORTableWithProperties(boolean useJdbc, put("tp_1", "p1"); } }; + hiveSyncConfig.saveAsSparkDataSourceTable = saveAsDataSourceTable; hiveSyncConfig.useJdbc = useJdbc; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); @@ -247,14 +286,15 @@ public void testSyncMORTableWithProperties(boolean useJdbc, String rtTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; String[] tableNames = new String[] {roTableName, rtTableName}; - String[] expectQueryTypes = new String[] {"read_optimized", "snapshot"}; + String[] readAsOptimizedResults = new String[] {"true", "false"}; SessionState.start(HiveTestUtil.getHiveConf()); Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf()); + String sparkTableProperties = getSparkTableProperties(saveAsDataSourceTable, useSchemaFromCommitMetadata); for (int i = 0;i < 2; i++) { String dbTableName = hiveSyncConfig.databaseName + "." + tableNames[i]; - String expectQueryType = expectQueryTypes[i]; + String readAsOptimized = readAsOptimizedResults[i]; hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName); List results = new ArrayList<>(); @@ -265,6 +305,7 @@ public void testSyncMORTableWithProperties(boolean useJdbc, assertEquals( "EXTERNAL\tTRUE\n" + "last_commit_time_sync\t101\n" + + sparkTableProperties + "tp_0\tp0\n" + "tp_1\tp1", tblPropertiesWithoutDdlTime); assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); @@ -275,8 +316,10 @@ public void testSyncMORTableWithProperties(boolean useJdbc, hiveDriver.getResults(results); String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); - assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'")); assertTrue(ddl.toLowerCase().contains("create external table")); + if (saveAsDataSourceTable) { + assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='" + readAsOptimized + "'")); + } } } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 90f6017f1c060..1107d744e68b0 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -108,6 +108,10 @@ public FileSystem getFs() { return fs; } + public boolean isBootstrap() { + return metaClient.getTableConfig().getBootstrapBasePath().isPresent(); + } + public void closeQuietly(ResultSet resultSet, Statement stmt) { try { if (stmt != null) { diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 2cda97723560f..09cea80a4e846 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -143,6 +143,13 @@ org.apache.hbase:hbase-common commons-codec:commons-codec + org.apache.spark:spark-sql_${scala.binary.version} + org.apache.spark:spark-catalyst_${scala.binary.version} + org.apache.spark:spark-core_${scala.binary.version} + org.json4s:json4s-jackson_${scala.binary.version} + org.json4s:json4s-ast_${scala.binary.version} + org.json4s:json4s-scalap_${scala.binary.version} + org.json4s:json4s-core_${scala.binary.version} @@ -592,8 +599,52 @@ + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + compile + + + + org.apache.spark + spark-sql_${scala.binary.version} + compile + + + + org.apache.spark + spark-core_${scala.binary.version} + compile + + + + org.json4s + json4s-jackson_${scala.binary.version} + 3.5.3 + + + + org.json4s + json4s-ast_${scala.binary.version} + 3.5.3 + + + + org.json4s + json4s-scalap_${scala.binary.version} + 3.5.3 + + + + org.json4s + json4s-core_${scala.binary.version} + 3.5.3 + + flink-bundle-shade-hive1 diff --git a/packaging/hudi-hive-sync-bundle/pom.xml b/packaging/hudi-hive-sync-bundle/pom.xml index a8dbc1e915172..d26b8f2ffe402 100644 --- a/packaging/hudi-hive-sync-bundle/pom.xml +++ b/packaging/hudi-hive-sync-bundle/pom.xml @@ -75,6 +75,13 @@ com.esotericsoftware:kryo-shaded org.objenesis:objenesis com.esotericsoftware:minlog + org.apache.spark:spark-sql_${scala.binary.version} + org.apache.spark:spark-catalyst_${scala.binary.version} + org.apache.spark:spark-core_${scala.binary.version} + org.json4s:json4s-jackson_${scala.binary.version} + org.json4s:json4s-ast_${scala.binary.version} + org.json4s:json4s-scalap_${scala.binary.version} + org.json4s:json4s-core_${scala.binary.version} @@ -140,5 +147,53 @@ avro compile + + org.apache.spark + spark-sql_${scala.binary.version} + compile + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + compile + + + + org.apache.spark + spark-sql_${scala.binary.version} + compile + + + + org.apache.spark + spark-core_${scala.binary.version} + compile + + + + org.json4s + json4s-jackson_${scala.binary.version} + 3.5.3 + + + + org.json4s + json4s-ast_${scala.binary.version} + 3.5.3 + + + + org.json4s + json4s-scalap_${scala.binary.version} + 3.5.3 + + + + org.json4s + json4s-core_${scala.binary.version} + 3.5.3 +