diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala new file mode 100644 index 0000000000000..eaaf82182a77e --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +object HoodieConversionUtils { + + def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] = + if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else org.apache.hudi.common.util.Option.empty() + + def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] = + if (opt.isPresent) Some(opt.get) else None + +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index c963806416061..cce6eacb03d73 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -118,11 +118,6 @@ object HoodieSparkUtils extends SparkAdapterSupport { }) } - def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = { - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) - } - /** * @deprecated please use other overload [[createRdd]] */ diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 9eae74e928c25..8c88bfb001fc2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -198,7 +198,7 @@ protected HoodieTimeline getActiveTimeline() { // that is under the pending compaction process, new log-file will bear the compaction's instant (on the // timeline) in its name, as opposed to the base-file's commit instant. To make sure we're not filtering // such log-file we have to _always_ include pending compaction instants into consideration - // TODO(HUDI-3302) re-evaluate whether we should not filter any commits in here + // TODO(HUDI-3302) re-evaluate whether we should filter any commits in here HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline(); if (shouldIncludePendingCommits) { return timeline; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index a0771d1249c75..2c107694a1b28 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -509,19 +509,16 @@ public MessageType readSchemaFromLogFile(Path path) throws IOException { * @return */ public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); - HoodieDataBlock lastBlock = null; - while (reader.hasNext()) { - HoodieLogBlock block = reader.next(); - if (block instanceof HoodieDataBlock) { - lastBlock = (HoodieDataBlock) block; + try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null)) { + HoodieDataBlock lastBlock = null; + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieDataBlock) { + lastBlock = (HoodieDataBlock) block; + } } + return lastBlock != null ? new AvroSchemaConverter().convert(lastBlock.getSchema()) : null; } - reader.close(); - if (lastBlock != null) { - return new AvroSchemaConverter().convert(lastBlock.getSchema()); - } - return null; } public boolean isHasOperationField() { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index d2501ee8dc15e..eb44769d9ff32 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -19,21 +19,11 @@ package org.apache.hudi.hadoop.utils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; @@ -41,14 +31,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import static org.apache.hudi.TypeUtils.unsafeCast; public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { @@ -67,41 +49,6 @@ public static boolean doesBelongToIncrementalQuery(FileSplit s) { return false; } - // Return parquet file with a list of log files in the same file group. - public static List, List>> groupLogsByBaseFile(Configuration conf, List partitionPaths) { - Set partitionSet = new HashSet<>(partitionPaths); - // TODO(vc): Should we handle also non-hoodie splits here? - Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); - - // Get all the base file and it's log files pairs in required partition paths. - List, List>> baseAndLogsList = new ArrayList<>(); - partitionSet.forEach(partitionPath -> { - // for each partition path obtain the data & log file groupings, then map back to inputsplits - HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); - String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); - - try { - // Both commit and delta-commits are included - pick the latest completed one - Option latestCompletedInstant = - metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant(); - - Stream latestFileSlices = latestCompletedInstant - .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) - .orElse(Stream.empty()); - - latestFileSlices.forEach(fileSlice -> { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths)); - }); - } catch (Exception e) { - throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e); - } - }); - return baseAndLogsList; - } - - /** * Add a field to the existing fields projected. */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala new file mode 100644 index 0000000000000..da4e8d30e206f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.createBaseFileReader +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.types.StructType + +/** + * [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying + * modes: + *
    + *
  • For COW tables: Snapshot
  • + *
  • For MOR tables: Read-optimized
  • + *
+ * + * NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the + * fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists + * as part of the record payload. In some cases, however, partition path might not necessarily be equal to the + * verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect + * partition field values being written + */ +class BaseFileOnlyRelation(sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType], + globPaths: Seq[Path]) + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { + + override type FileSplit = HoodieBaseFileSplit + + protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], + partitionSchema: StructType, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): HoodieUnsafeRDD = { + val baseFileReader = createBaseFileReader( + spark = sparkSession, + partitionSchema = partitionSchema, + tableSchema = tableSchema, + requiredSchema = requiredSchema, + filters = filters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = new Configuration(conf) + ) + + new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits) + } + + protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { + val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) + val fileSplits = partitions.values.toSeq.flatMap { files => + files.flatMap { file => + // TODO move to adapter + // TODO fix, currently assuming parquet as underlying format + HoodieDataSourceHelper.splitFiles( + sparkSession = sparkSession, + file = file, + // TODO clarify why this is required + partitionValues = InternalRow.empty + ) + } + } + + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + + sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes).map(HoodieBaseFileSplit.apply) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala deleted file mode 100644 index 473bb2e2445bd..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.createBaseFileReader -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.hadoop.HoodieROTablePathFilter -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.StructType - -/** - * [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying - * modes: - *
    - *
  • For COW tables: Snapshot
  • - *
  • For MOR tables: Read-optimized
  • - *
- * - * NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the - * fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists - * as part of the record payload. In some cases, however, partition path might not necessarily be equal to the - * verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect - * partition field values being written - */ -class BaseFileOnlyViewRelation(sqlContext: SQLContext, - metaClient: HoodieTableMetaClient, - optParams: Map[String, String], - userSchema: Option[StructType], - globPaths: Seq[Path]) - extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { - - private val fileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams, - FileStatusCache.getOrCreate(sqlContext.sparkSession)) - - override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { - // NOTE: In case list of requested columns doesn't contain the Primary Key one, we - // have to add it explicitly so that - // - Merging could be performed correctly - // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], - // Spark still fetches all the rows to execute the query correctly - // - // It's okay to return columns that have not been requested by the caller, as those nevertheless will be - // filtered out upstream - val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) - - val (requiredAvroSchema, requiredStructSchema) = - HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) - - val filterExpressions = convertToExpressions(filters) - val (partitionFilters, dataFilters) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates( - sparkSession, filterExpressions, partitionColumns) - - val filePartitions = getPartitions(partitionFilters, dataFilters) - - val partitionSchema = StructType(Nil) - val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) - val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) - - val baseFileReader = createBaseFileReader( - spark = sparkSession, - partitionSchema = partitionSchema, - tableSchema = tableSchema, - requiredSchema = requiredSchema, - filters = filters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = new Configuration(conf) - ) - - new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions) - } - - private def getPartitions(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FilePartition] = { - val partitionDirectories = if (globPaths.isEmpty) { - val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams, - FileStatusCache.getOrCreate(sqlContext.sparkSession)) - hoodieFileIndex.listFiles(partitionFilters, dataFilters) - } else { - sqlContext.sparkContext.hadoopConfiguration.setClass( - "mapreduce.input.pathFilter.class", - classOf[HoodieROTablePathFilter], - classOf[org.apache.hadoop.fs.PathFilter]) - - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths) - inMemoryFileIndex.listFiles(partitionFilters, dataFilters) - } - - val partitions = partitionDirectories.flatMap { partition => - partition.files.flatMap { file => - // TODO move to adapter - // TODO fix, currently assuming parquet as underlying format - HoodieDataSourceHelper.splitFiles( - sparkSession = sparkSession, - file = file, - // TODO clarify why this is required - partitionValues = InternalRow.empty - ) - } - } - - val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - - sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes) - } - - private def convertToExpressions(filters: Array[Filter]): Array[Expression] = { - val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) - - val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty } - if (failedExprs.nonEmpty) { - val failedFilters = failedExprs.map(p => filters(p._2)) - logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})") - } - - catalystExpressions.filter(_.isDefined).map(_.get).toArray - } -} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 7d558ec8cd10e..65dbdee127902 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -107,7 +107,7 @@ class DefaultSource extends RelationProvider case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => - new BaseFileOnlyViewRelation(sqlContext, metaClient, parameters, userSchema, globPaths) + new BaseFileOnlyRelation(sqlContext, metaClient, parameters, userSchema, globPaths) case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index e07b316d48db3..11778da63db31 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -20,22 +20,28 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.isMetadataTable +import org.apache.hudi.HoodieBaseRelation.{getPartitionPath, isMetadataTable} +import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.SerializableConfiguration import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata} +import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType @@ -44,6 +50,8 @@ import org.apache.spark.sql.{Row, SQLContext, SparkSession} import scala.collection.JavaConverters._ import scala.util.Try +trait HoodieFileSplit {} + case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String) case class HoodieTableState(recordKeyField: String, @@ -53,36 +61,33 @@ case class HoodieTableState(recordKeyField: String, * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. */ abstract class HoodieBaseRelation(val sqlContext: SQLContext, - metaClient: HoodieTableMetaClient, - optParams: Map[String, String], + val metaClient: HoodieTableMetaClient, + val optParams: Map[String, String], userSchema: Option[StructType]) extends BaseRelation with PrunedFilteredScan with Logging { + type FileSplit <: HoodieFileSplit + + imbueConfigs(sqlContext) + protected val sparkSession: SparkSession = sqlContext.sparkSession protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) protected lazy val jobConf = new JobConf(conf) + protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig + + protected lazy val basePath: String = metaClient.getBasePath + // If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one // NOTE: This is historical behavior which is preserved as is protected lazy val recordKeyField: String = - if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD - else metaClient.getTableConfig.getRecordKeyFieldProp + if (tableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD + else tableConfig.getRecordKeyFieldProp protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty - /** - * @VisibleInTests - */ - lazy val mandatoryColumns: Seq[String] = { - if (isMetadataTable(metaClient)) { - Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE) - } else { - Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) - } - } - - protected lazy val specifiedQueryInstant: Option[String] = + protected lazy val specifiedQueryTimestamp: Option[String] = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) .map(HoodieSqlCommonUtils.formatQueryInstant) @@ -100,25 +105,49 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) - protected val partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty) + protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) - protected def getPrecombineFieldProperty: Option[String] = - Option(metaClient.getTableConfig.getPreCombineField) - .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { - // NOTE: This is required to compensate for cases when empty string is used to stub - // property value to avoid it being set with the default value - // TODO(HUDI-3456) cleanup - case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) - case _ => None + /** + * NOTE: PLEASE READ THIS CAREFULLY + * + * Even though [[HoodieFileIndex]] initializes eagerly listing all of the files w/in the given Hudi table, + * this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until + * it's actually accessed + */ + protected lazy val fileIndex: HoodieFileIndex = + HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams, + FileStatusCache.getOrCreate(sparkSession)) + + /** + * @VisibleInTests + */ + lazy val mandatoryColumns: Seq[String] = { + if (isMetadataTable(metaClient)) { + Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE) + } else { + // TODO this is MOR table requirement, not necessary for COW + Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) } + } + + protected def timeline: HoodieTimeline = + // NOTE: We're including compaction here since it's not considering a "commit" operation + metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants + + protected def latestInstant: Option[HoodieInstant] = + toScalaOption(timeline.lastInstant()) + + protected def queryTimestamp: Option[String] = { + specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(i => i.getTimestamp)) + } override def schema: StructType = tableStructSchema /** * This method controls whether relation will be producing *
    - *
  • [[Row]], when it's being equal to true
  • - *
  • [[InternalRow]], when it's being equal to false
  • + *
  • [[Row]], when it's being equal to true
  • + *
  • [[InternalRow]], when it's being equal to false
  • *
* * Returning [[InternalRow]] directly enables us to save on needless ser/de loop from [[InternalRow]] (being @@ -130,22 +159,129 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // have to add it explicitly so that + // - Merging could be performed correctly + // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], + // Spark still fetches all the rows to execute the query correctly + // + // It's okay to return columns that have not been requested by the caller, as those nevertheless will be + // filtered out upstream + val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) + + val filterExpressions = convertToExpressions(filters) + val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + + val fileSplits = collectFileSplits(partitionFilters, dataFilters) + + val partitionSchema = StructType(Nil) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details - doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]] + if (fileSplits.nonEmpty) + composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] + else + sparkSession.sparkContext.emptyRDD + } + + /** + * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied + * + * @param fileSplits file splits to be handled by the RDD + * @param partitionSchema target table's partition schema + * @param tableSchema target table's schema + * @param requiredSchema projected schema required by the reader + * @param filters data filters to be applied + * @return instance of RDD (implementing [[HoodieUnsafeRDD]]) + */ + protected def composeRDD(fileSplits: Seq[FileSplit], + partitionSchema: StructType, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): HoodieUnsafeRDD + + /** + * Provided with partition and date filters collects target file splits to read records from, while + * performing pruning if necessary + * + * @param partitionFilters partition filters to be applied + * @param dataFilters data filters to be applied + * @return list of [[FileSplit]] to fetch records from + */ + protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] + + protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { + val partitionDirs = if (globbedPaths.isEmpty) { + fileIndex.listFiles(partitionFilters, dataFilters) + } else { + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths) + inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + } + + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus) + + latestBaseFiles.groupBy(getPartitionPath) } - protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] + protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = { + val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) + + val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty } + if (failedExprs.nonEmpty) { + val failedFilters = failedExprs.map(p => filters(p._2)) + logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})") + } + + catalystExpressions.filter(_.isDefined).map(_.get).toArray + } + + /** + * Checks whether given expression only references partition columns + * (and involves no sub-query) + */ + protected def isPartitionPredicate(condition: Expression): Boolean = { + // Validates that the provided names both resolve to the same entity + val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver + + condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } && + !SubqueryExpression.hasSubquery(condition) + } protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) requestedColumns ++ missing } + + private def getPrecombineFieldProperty: Option[String] = + Option(tableConfig.getPreCombineField) + .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { + // NOTE: This is required to compensate for cases when empty string is used to stub + // property value to avoid it being set with the default value + // TODO(HUDI-3456) cleanup + case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) + case _ => None + } + + private def imbueConfigs(sqlContext: SQLContext): Unit = { + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") + // TODO(HUDI-3639) vectorized reader has to be disabled to make sure MORIncrementalRelation is working properly + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") + } } object HoodieBaseRelation { - def isMetadataTable(metaClient: HoodieTableMetaClient) = + def getPartitionPath(fileStatus: FileStatus): Path = + fileStatus.getPath.getParent + + def isMetadataTable(metaClient: HoodieTableMetaClient): Boolean = HoodieTableMetadata.isMetadataTable(metaClient.getBasePath) /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index dd90d724c6b61..00de2f756e912 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.exception.HoodieException +import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -157,7 +158,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, logInfo("Building file index..") val fileStatuses = if (globPaths.nonEmpty) { // Load files from the global paths if it has defined to be compatible with the original mode - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths) + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(_sqlContext.sparkSession, globPaths) inMemoryFileIndex.allFiles() } else { // Load files by the HoodieFileIndex. HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 40299cfdcd6f1..0871487b5e8c6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -65,20 +65,6 @@ object HoodieDataSourceHelper extends PredicateHelper { } } - /** - * Extract the required schema from [[InternalRow]] - */ - def extractRequiredSchema( - iter: Iterator[InternalRow], - requiredSchema: StructType, - requiredFieldPos: Seq[Int]): Iterator[InternalRow] = { - val unsafeProjection = UnsafeProjection.create(requiredSchema) - val rows = iter.map { row => - unsafeProjection(createInternalRowWithSchema(row, requiredSchema, requiredFieldPos)) - } - rows - } - /** * Convert [[InternalRow]] to [[SpecificInternalRow]]. */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 6aa6fbb0eeaa9..de863203d6d5e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -348,7 +348,7 @@ object HoodieFileIndex extends Logging { } } catch { case NonFatal(e) => - logWarning("Fail to convert filters for TimestampBaseAvroKeyGenerator.") + logWarning("Fail to convert filters for TimestampBaseAvroKeyGenerator", e) partitionFilters } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala index 7e8f62bd2500a..0349cd853b601 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -24,12 +24,14 @@ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException} import org.apache.spark.{Partition, TaskContext} +case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit + /** * TODO eval if we actually need it */ class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], - @transient fileSplits: Seq[FilePartition]) + @transient fileSplits: Seq[HoodieBaseFileSplit]) extends HoodieUnsafeRDD(sparkSession.sparkContext) { override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { @@ -77,5 +79,5 @@ class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, iterator.asInstanceOf[Iterator[InternalRow]] } - override protected def getPartitions: Array[Partition] = fileSplits.toArray + override protected def getPartitions: Array[Partition] = fileSplits.map(_.filePartition).toArray } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 3a518da32b3dd..cc2915d605ff7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -56,7 +56,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, tableState: HoodieTableState, tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, - @transient fileSplits: List[HoodieMergeOnReadFileSplit]) + @transient fileSplits: Seq[HoodieMergeOnReadFileSplit]) extends HoodieUnsafeRDD(sc) { private val confBroadcast = sc.broadcast(new SerializableWritable(config)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 8308e3b7ee8ad..2517252d700fb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -20,65 +20,134 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hudi.HoodieBaseRelation.createBaseFileReader -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits} -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.immutable /** - * Experimental. - * Relation, that implements the Hoodie incremental view for Merge On Read table. - * + * @Experimental */ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, - val optParams: Map[String, String], - val userSchema: Option[StructType], - val metaClient: HoodieTableMetaClient) - extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { - - private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() - if (commitTimeline.empty()) { - throw new HoodieException("No instants to incrementally pull") + optParams: Map[String, String], + userSchema: Option[StructType], + metaClient: HoodieTableMetaClient) + extends MergeOnReadSnapshotRelation(sqlContext, optParams, userSchema, Seq(), metaClient) with HoodieIncrementalRelationTrait { + + override type FileSplit = HoodieMergeOnReadFileSplit + + override protected def timeline: HoodieTimeline = { + val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) + val endTimestamp = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp) + super.timeline.findInstantsInRange(startTimestamp, endTimestamp) } - if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) { - throw new HoodieException(s"Specify the begin instant time to pull from using " + - s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}") + + protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], + partitionSchema: StructType, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): HoodieMergeOnReadRDD = { + val fullSchemaParquetReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + tableSchema = tableSchema, + requiredSchema = tableSchema, + // This file-reader is used to read base file records, subsequently merging them with the records + // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding + // applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that + // we combine them correctly) + // + // The only filtering applicable here is the filtering to make sure we're only fetching records that + // fall into incremental span of the timeline being queried + filters = incrementalSpanRecordFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = new Configuration(conf) + ) + + val requiredSchemaParquetReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + tableSchema = tableSchema, + requiredSchema = requiredSchema, + filters = filters ++ incrementalSpanRecordFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = new Configuration(conf) + ) + + val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt) + + // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately + // filtered, since file-reader might not be capable to perform filtering + new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, + requiredSchemaParquetReader, hoodieTableState, tableSchema, requiredSchema, fileSplits) } - if (!metaClient.getTableConfig.populateMetaFields()) { - throw new HoodieException("Incremental queries are not supported when meta fields are disabled") + + override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { + if (includedCommits.isEmpty) { + List() + } else { + val latestCommit = includedCommits.last.getTimestamp + val commitsMetadata = includedCommits.map(getCommitMetadata(_, timeline)).asJava + + val modifiedFiles = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) + val fsView = new HoodieTableFileSystemView(metaClient, timeline, modifiedFiles) + + val modifiedPartitions = getWritePartitionPaths(commitsMetadata) + + val fileSlices = modifiedPartitions.asScala.flatMap { relativePartitionPath => + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala + }.toSeq + + buildSplits(filterFileSlices(fileSlices, globPattern)) + } } - private val lastInstant = commitTimeline.lastInstant().get() - private val mergeType = optParams.getOrElse( - DataSourceReadOptions.REALTIME_MERGE.key, - DataSourceReadOptions.REALTIME_MERGE.defaultValue) + private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: String): Seq[FileSlice] = { + val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) { + val globMatcher = new GlobPattern("*" + pathGlobPattern) + fileSlices.filter(fileSlice => { + val path = toScalaOption(fileSlice.getBaseFile).map(_.getPath) + .orElse(toScalaOption(fileSlice.getLatestLogFile).map(_.getPath.toString)) + .get + globMatcher.matches(path) + }) + } else { + fileSlices + } + filteredFileSlices + } +} - private val commitsTimelineToReturn = commitTimeline.findInstantsInRange( - optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key), - optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp)) - logDebug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}") - private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList +trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { - private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + // Validate this Incremental implementation is properly configured + validate() - private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex() + protected lazy val includedCommits: immutable.Seq[HoodieInstant] = timeline.getInstants.iterator().asScala.toList // Record filters making sure that only records w/in the requested bounds are being fetched as part of the // scan collected by this relation - private lazy val incrementalSpanRecordsFilters: Seq[Filter] = { + protected lazy val incrementalSpanRecordFilters: Seq[Filter] = { val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) - val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp) - val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp) + val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp) + val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.last.getTimestamp) + Seq(isNotNullFilter, largerThanFilter, lessThanFilter) } @@ -89,132 +158,23 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) } - override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { - if (fileIndex.isEmpty) { - sqlContext.sparkContext.emptyRDD[InternalRow] - } else { - logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") - logDebug(s"buildScan filters = ${filters.mkString(",")}") - - // config to ensure the push down filter for parquet will be applied. - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") - - val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) - - val (requiredAvroSchema, requiredStructSchema) = - HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) - - val partitionSchema = StructType(Nil) - val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) - val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) - - val fullSchemaParquetReader = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - tableSchema = tableSchema, - requiredSchema = tableSchema, - // This file-reader is used to read base file records, subsequently merging them with the records - // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding - // applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that - // we combine them correctly) - // - // The only filtering applicable here is the filtering to make sure we're only fetching records that - // fall into incremental span of the timeline being queried - filters = incrementalSpanRecordsFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = new Configuration(conf) - ) - val requiredSchemaParquetReader = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - tableSchema = tableSchema, - requiredSchema = requiredSchema, - filters = filters ++ incrementalSpanRecordsFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = new Configuration(conf) - ) - - val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt) - - // TODO implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately - // filtered, since file-reader might not be capable to perform filtering - new HoodieMergeOnReadRDD( - sqlContext.sparkContext, - jobConf, - fullSchemaParquetReader, - requiredSchemaParquetReader, - hoodieTableState, - tableSchema, - requiredSchema, - fileIndex - ) + protected def validate(): Unit = { + if (super.timeline.empty()) { + throw new HoodieException("No instants to incrementally pull") } - } - def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { - val metadataList = commitsToReturn.map(instant => getCommitMetadata(instant, commitsTimelineToReturn)) - val affectedFileStatus = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), metadataList) - val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus) - - // Iterate partitions to create splits - val fileGroups = getWritePartitionPaths(metadataList).flatMap(partitionPath => - fsView.getAllFileGroups(partitionPath).iterator() - ).toList - val latestCommit = fsView.getLastInstant.get.getTimestamp - if (log.isDebugEnabled) { - fileGroups.foreach(f => logDebug(s"current file group id: " + - s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}")) + if (!this.optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) { + throw new HoodieException(s"Specify the begin instant time to pull from using " + + s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}") } - // Filter files based on user defined glob pattern - val pathGlobPattern = optParams.getOrElse( - DataSourceReadOptions.INCR_PATH_GLOB.key, - DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) - val filteredFileGroup = if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { - val globMatcher = new GlobPattern("*" + pathGlobPattern) - fileGroups.filter(fg => { - val latestFileSlice = fg.getLatestFileSlice.get - if (latestFileSlice.getBaseFile.isPresent) { - globMatcher.matches(latestFileSlice.getBaseFile.get.getPath) - } else { - globMatcher.matches(latestFileSlice.getLatestLogFile.get.getPath.toString) - } - }) - } else { - fileGroups + if (!this.tableConfig.populateMetaFields()) { + throw new HoodieException("Incremental queries are not supported when meta fields are disabled") } - - // Build HoodieMergeOnReadFileSplit. - filteredFileGroup.map(f => { - // Ensure get the base file when there is a pending compaction, which means the base file - // won't be in the latest file slice. - val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList - val partitionedFile = if (baseFiles.nonEmpty) { - val baseFile = baseFiles.head.getBaseFile - val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath) - Option(PartitionedFile(InternalRow.empty, filePath, 0, baseFile.get.getFileLen)) - } - else { - Option.empty - } - - val logPath = if (f.getLatestFileSlice.isPresent) { - // If log path doesn't exist, we still include an empty path to avoid using - // the default parquet reader to ensure the push down filter will be applied. - Option(f.getLatestFileSlice.get().getLogFiles.iterator().toList) - } - else { - Option.empty - } - - HoodieMergeOnReadFileSplit(partitionedFile, logPath, - latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) - }) } + + protected def globPattern: String = + optParams.getOrElse(DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) + } + diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 6156054b4f45b..d2515e3297d0f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -21,16 +21,18 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieBaseRelation.createBaseFileReader -import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord} +import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath +import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes -import org.apache.spark.rdd.RDD +import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} +import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType @@ -41,43 +43,28 @@ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], latestCommit: String, tablePath: String, maxCompactionMemoryInBytes: Long, - mergeType: String) + mergeType: String) extends HoodieFileSplit class MergeOnReadSnapshotRelation(sqlContext: SQLContext, optParams: Map[String, String], - val userSchema: Option[StructType], - val globPaths: Seq[Path], - val metaClient: HoodieTableMetaClient) + userSchema: Option[StructType], + globPaths: Seq[Path], + metaClient: HoodieTableMetaClient) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { + override type FileSplit = HoodieMergeOnReadFileSplit + private val mergeType = optParams.getOrElse( DataSourceReadOptions.REALTIME_MERGE.key, DataSourceReadOptions.REALTIME_MERGE.defaultValue) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) - override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { - log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") - log.debug(s" buildScan filters = ${filters.mkString(",")}") - - // NOTE: In case list of requested columns doesn't contain the Primary Key one, we - // have to add it explicitly so that - // - Merging could be performed correctly - // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], - // Spark still fetches all the rows to execute the query correctly - // - // It's okay to return columns that have not been requested by the caller, as those nevertheless will be - // filtered out upstream - val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) - - val (requiredAvroSchema, requiredStructSchema) = - HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) - val fileIndex = buildFileIndex(filters) - - val partitionSchema = StructType(Nil) - val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) - val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) - + protected override def composeRDD(fileIndex: Seq[HoodieMergeOnReadFileSplit], + partitionSchema: StructType, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): HoodieMergeOnReadRDD = { val fullSchemaParquetReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, @@ -93,6 +80,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, // to configure Parquet reader appropriately hadoopConf = new Configuration(conf) ) + val requiredSchemaParquetReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, @@ -111,90 +99,53 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, fileIndex) } - def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = { - if (globPaths.nonEmpty) { - // Load files from the global paths if it has defined to be compatible with the original mode - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths) - val fsView = new HoodieTableFileSystemView(metaClient, - // file-slice after pending compaction-requested instant-time is also considered valid - metaClient.getCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants, - inMemoryFileIndex.allFiles().toArray) - val partitionPaths = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent) + protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { + val convertedPartitionFilters = + HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) - - if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list. - List.empty[HoodieMergeOnReadFileSplit] - } else { - val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant() - if (!lastInstant.isPresent) { // Return empty list if the table has no commit - List.empty - } else { - val queryInstant = specifiedQueryInstant.getOrElse(lastInstant.get().getTimestamp) - val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala - val fileSplits = baseAndLogsList.map(kv => { - val baseFile = kv.getLeft - val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList) - - val baseDataPath = if (baseFile.isPresent) { - Some(PartitionedFile( - InternalRow.empty, - MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath), - 0, baseFile.get.getFileLen) - ) - } else { - None - } - HoodieMergeOnReadFileSplit(baseDataPath, logPaths, queryInstant, - metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) - }).toList - fileSplits - } - } + if (globPaths.isEmpty) { + val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters) + buildSplits(fileSlices.values.flatten.toSeq) } else { - // Load files by the HoodieFileIndex. - val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, - Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) - - // Get partition filter and convert to catalyst expression - val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet - val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p))) - val partitionFilterExpression = - HoodieSparkUtils.convertToCatalystExpression(partitionFilters, tableStructSchema) - val convertedPartitionFilterExpression = - HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilterExpression.toSeq) - - // If convert success to catalyst expression, use the partition prune - val fileSlices = if (convertedPartitionFilterExpression.nonEmpty) { - hoodieFileIndex.listFileSlices(convertedPartitionFilterExpression) + // TODO refactor to avoid iterating over listed files multiple times + val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters) + val partitionPaths = partitions.keys.toSeq + if (partitionPaths.isEmpty || latestInstant.isEmpty) { + // If this an empty table OR it has no completed commits yet, return + List.empty[HoodieMergeOnReadFileSplit] } else { - hoodieFileIndex.listFileSlices(Seq.empty[Expression]) + val fileSlices = listFileSlices(partitionPaths) + buildSplits(fileSlices) } + } + } - if (fileSlices.isEmpty) { - // If this an empty table, return an empty split list. - List.empty[HoodieMergeOnReadFileSplit] - } else { - val fileSplits = fileSlices.values.flatten.map(fileSlice => { - val latestInstant = metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants.lastInstant().get().getTimestamp - val queryInstant = specifiedQueryInstant.getOrElse(latestInstant) - - val partitionedFile = if (fileSlice.getBaseFile.isPresent) { - val baseFile = fileSlice.getBaseFile.get() - val baseFilePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath) - Option(PartitionedFile(InternalRow.empty, baseFilePath, 0, baseFile.getFileLen)) - } else { - Option.empty - } - - val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList - val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths) - - HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath, - maxCompactionMemoryInBytes, mergeType) - }).toList - fileSplits + protected def buildSplits(fileSlices: Seq[FileSlice]): List[HoodieMergeOnReadFileSplit] = { + fileSlices.map { fileSlice => + val baseFile = toScalaOption(fileSlice.getBaseFile) + val logFiles = Option(fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList) + + val partitionedBaseFile = baseFile.map { file => + val filePath = getFilePath(file.getFileStatus.getPath) + PartitionedFile(InternalRow.empty, filePath, 0, file.getFileLen) } + + HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles, queryTimestamp.get, + metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) + }.toList + } + + private def listFileSlices(partitionPaths: Seq[Path]): Seq[FileSlice] = { + // NOTE: It's critical for us to re-use [[InMemoryFileIndex]] to make sure we're leveraging + // [[FileStatusCache]] and avoid listing the whole table again + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, partitionPaths) + val fsView = new HoodieTableFileSystemView(metaClient, timeline, inMemoryFileIndex.allFiles.toArray) + + val queryTimestamp = this.queryTimestamp.get + + partitionPaths.flatMap { partitionPath => + val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 74b8e10c4415b..a06ffffe50e50 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ -import scala.language.implicitConversions /** * Implementation of the [[BaseHoodieTableFileIndex]] for Spark @@ -135,12 +134,14 @@ class SparkHoodieTableFileIndex(spark: SparkSession, * Fetch list of latest base files w/ corresponding log files, after performing * partition pruning * + * TODO unify w/ HoodieFileIndex#listFiles + * * @param partitionFilters partition column filters * @return mapping from string partition paths to its base/log files */ def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters) + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet().asScala.toSeq, partitionFilters) prunedPartitions.map(partition => { (partition.path, cachedAllInputFileSlices.get(partition).asScala) }).toMap diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieHadoopFSUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieHadoopFSUtils.scala new file mode 100644 index 0000000000000..353d94a7c105f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieHadoopFSUtils.scala @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.viewfs.ViewFileSystem +import org.apache.hadoop.fs._ +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.util.SerializableConfiguration + +import java.io.FileNotFoundException +import scala.collection.mutable + +/** + * NOTE: This method class is replica of HadoopFSUtils from Spark 3.2.1, with the following adjustments + * + * - Filtering out of the listed files is adjusted to include files starting w/ "." (to include Hoodie Delta Log + * files) + */ +object HoodieHadoopFSUtils extends Logging { + /** + * Lists a collection of paths recursively. Picks the listing strategy adaptively depending + * on the number of paths to list. + * + * This may only be called on the driver. + * + * @param sc Spark context used to run parallel listing. + * @param paths Input paths to list + * @param hadoopConf Hadoop configuration + * @param filter Path filter used to exclude leaf files from result + * @param ignoreMissingFiles Ignore missing files that happen during recursive listing + * (e.g., due to race conditions) + * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, + * this will return `FileStatus` without `BlockLocation` info. + * @param parallelismThreshold The threshold to enable parallelism. If the number of input paths + * is smaller than this value, this will fallback to use + * sequential listing. + * @param parallelismMax The maximum parallelism for listing. If the number of input paths is + * larger than this value, parallelism will be throttled to this value + * to avoid generating too many tasks. + * @return for each input path, the set of discovered files for the path + */ + def parallelListLeafFiles(sc: SparkContext, + paths: Seq[Path], + hadoopConf: Configuration, + filter: PathFilter, + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + parallelismThreshold: Int, + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { + parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true, + ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax) + } + + // scalastyle:off parameter.number + private def parallelListLeafFilesInternal(sc: SparkContext, + paths: Seq[Path], + hadoopConf: Configuration, + filter: PathFilter, + isRootLevel: Boolean, + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + parallelismThreshold: Int, + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { + + // Short-circuits parallel listing when serial listing is likely to be faster. + if (paths.size <= parallelismThreshold) { + // scalastyle:off return + return paths.map { path => + val leafFiles = listLeafFiles( + path, + hadoopConf, + filter, + Some(sc), + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + isRootPath = isRootLevel, + parallelismThreshold = parallelismThreshold, + parallelismMax = parallelismMax) + (path, leafFiles) + } + // scalastyle:on return + } + + logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + + s" The first several paths are: ${paths.take(10).mkString(", ")}.") + HiveCatalogMetrics.incrementParallelListingJobCount(1) + + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = paths.map(_.toString) + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, parallelismMax) + + val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) + val statusMap = try { + val description = paths.size match { + case 0 => + "Listing leaf files and directories 0 paths" + case 1 => + s"Listing leaf files and directories for 1 path:
${paths(0)}" + case s => + s"Listing leaf files and directories for $s paths:
${paths(0)}, ..." + } + sc.setJobDescription(description) + sc + .parallelize(serializedPaths, numParallelism) + .mapPartitions { pathStrings => + val hadoopConf = serializableConfiguration.value + pathStrings.map(new Path(_)).toSeq.map { path => + val leafFiles = listLeafFiles( + path = path, + hadoopConf = hadoopConf, + filter = filter, + contextOpt = None, // Can't execute parallel scans on workers + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + isRootPath = isRootLevel, + parallelismThreshold = Int.MaxValue, + parallelismMax = 0) + (path, leafFiles) + }.iterator + }.map { case (path, statuses) => + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) + }.collect() + } finally { + sc.setJobDescription(previousJobDescription) + } + + // turn SerializableFileStatus back to Status + statusMap.map { case (path, serializableStatuses) => + val statuses = serializableStatuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, + new Path(f.path)), + blockLocations) + } + (new Path(path), statuses) + } + } + // scalastyle:on parameter.number + + // scalastyle:off parameter.number + /** + * Lists a single filesystem path recursively. If a `SparkContext` object is specified, this + * function may launch Spark jobs to parallelize listing based on `parallelismThreshold`. + * + * If sessionOpt is None, this may be called on executors. + * + * @return all children of path that match the specified filter. + */ + private def listLeafFiles(path: Path, + hadoopConf: Configuration, + filter: PathFilter, + contextOpt: Option[SparkContext], + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + isRootPath: Boolean, + parallelismThreshold: Int, + parallelismMax: Int): Seq[FileStatus] = { + + logTrace(s"Listing $path") + val fs = path.getFileSystem(hadoopConf) + + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses: Array[FileStatus] = try { + fs match { + // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode + // to retrieve the file status with the file block location. The reason to still fallback + // to listStatus is because the default implementation would potentially throw a + // FileNotFoundException which is better handled by doing the lookups manually below. + case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality => + val remoteIter = fs.listLocatedStatus(path) + new Iterator[LocatedFileStatus]() { + def next(): LocatedFileStatus = remoteIter.next + + def hasNext(): Boolean = remoteIter.hasNext + }.toArray + case _ => fs.listStatus(path) + } + } catch { + // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to + // ignore FileNotFoundExceptions during this root level of the listing because + // + // (a) certain code paths might construct an InMemoryFileIndex with root paths that + // might not exist (i.e. not all callers are guaranteed to have checked + // path existence prior to constructing InMemoryFileIndex) and, + // (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break + // existing behavior and break the ability drop SessionCatalog tables when tables' + // root directories have been deleted (which breaks a number of Spark's own tests). + // + // If we are NOT listing a root path then a FileNotFoundException here means that the + // directory was present in a previous level of file listing but is absent in this + // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 + // list inconsistency). + // + // The trade-off in supporting existing behaviors / use-cases is that we won't be + // able to detect race conditions involving root paths being deleted during + // InMemoryFileIndex construction. However, it's still a net improvement to detect and + // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. + case _: FileNotFoundException if isRootPath || ignoreMissingFiles => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val filteredStatuses = + statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName)) + + val allLeafStatuses = { + val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) + val nestedFiles: Seq[FileStatus] = contextOpt match { + case Some(context) if dirs.size > parallelismThreshold => + parallelListLeafFilesInternal( + context, + dirs.map(_.getPath), + hadoopConf = hadoopConf, + filter = filter, + isRootLevel = false, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + parallelismThreshold = parallelismThreshold, + parallelismMax = parallelismMax + ).flatMap(_._2) + case _ => + dirs.flatMap { dir => + listLeafFiles( + path = dir.getPath, + hadoopConf = hadoopConf, + filter = filter, + contextOpt = contextOpt, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + isRootPath = false, + parallelismThreshold = parallelismThreshold, + parallelismMax = parallelismMax) + } + } + val allFiles = topLevelFiles ++ nestedFiles + if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles + } + + val missingFiles = mutable.ArrayBuffer.empty[String] + val resolvedLeafStatuses = allLeafStatuses.flatMap { + case f: LocatedFileStatus => + Some(f) + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `parallelListLeafFiles` when the number of + // paths exceeds threshold. + case f if !ignoreLocality => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + try { + val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => + // Store BlockLocation objects to consume less memory + if (loc.getClass == classOf[BlockLocation]) { + loc + } else { + new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) + } + } + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + Some(lfs) + } catch { + case _: FileNotFoundException if ignoreMissingFiles => + missingFiles += f.getPath.toString + None + } + + case f => Some(f) + } + + if (missingFiles.nonEmpty) { + logWarning( + s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") + } + + resolvedLeafStatuses + } + // scalastyle:on parameter.number + + /** A serializable variant of HDFS's BlockLocation. This is required by Hadoop 2.7. */ + private case class SerializableBlockLocation(names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. This is required by Hadoop 2.7. */ + private case class SerializableFileStatus(path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** Checks if we should filter out this path name. */ + def shouldFilterOutPathName(pathName: String): Boolean = { + // We filter follow paths: + // 1. everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we + // should skip this file in case of double reading. + val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || pathName.endsWith("._COPYING_") + val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") + exclude && !include + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala new file mode 100644 index 0000000000000..8ad78af7f3267 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.spark.HoodieHadoopFSUtils +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +class HoodieInMemoryFileIndex(sparkSession: SparkSession, + rootPathsSpecified: Seq[Path], + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache) + extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) { + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery threshold. + * + * This is publicly visible for testing. + * + * NOTE: This method replicates the one it overrides, however it uses custom method to run parallel + * listing that accepts files starting with "." + */ + override def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val startTime = System.nanoTime() + val output = mutable.LinkedHashSet[FileStatus]() + val pathsToFetch = mutable.ArrayBuffer[Path]() + for (path <- paths) { + fileStatusCache.getLeafFiles(path) match { + case Some(files) => + HiveCatalogMetrics.incrementFileCacheHits(files.length) + output ++= files + case None => + pathsToFetch += path + } + () // for some reasons scalac 2.12 needs this; return type doesn't matter + } + val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + val discovered = bulkListLeafFiles(sparkSession, pathsToFetch, filter, hadoopConf) + + discovered.foreach { case (path, leafFiles) => + HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) + fileStatusCache.putLeafFiles(path, leafFiles.toArray) + output ++= leafFiles + } + + logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf files" + + s" for ${paths.length} paths.") + + output + } + + protected def bulkListLeafFiles(sparkSession: SparkSession, paths: ArrayBuffer[Path], filter: PathFilter, hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = { + HoodieHadoopFSUtils.parallelListLeafFiles( + sc = sparkSession.sparkContext, + paths = paths, + hadoopConf = hadoopConf, + filter = new PathFilterWrapper(filter), + ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, + // NOTE: We're disabling fetching Block Info to speed up file listing + ignoreLocality = true, + parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, + parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism) + } +} + +object HoodieInMemoryFileIndex { + def create(sparkSession: SparkSession, globbedPaths: Seq[Path]): HoodieInMemoryFileIndex = { + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new HoodieInMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) + } +} + +private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable { + override def accept(path: Path): Boolean = { + (filter == null || filter.accept(path)) && !HoodieHadoopFSUtils.shouldFilterOutPathName(path.getName) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/TestHoodieInMemoryFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/TestHoodieInMemoryFileIndex.scala new file mode 100644 index 0000000000000..8e7f6bf14b7e5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/TestHoodieInMemoryFileIndex.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.execution.datasources + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir + +import java.io.File +import java.nio.file.Paths + +class TestHoodieInMemoryFileIndex { + + @Test + def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = { + val spark = SparkSession.builder + .appName("Hoodie Datasource test") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate + + val folders: Seq[Path] = Seq( + new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri) + ) + + val files: Seq[Path] = Seq( + new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri) + ) + + folders.foreach(folder => new File(folder.toUri).mkdir()) + files.foreach(file => new File(file.toUri).createNewFile()) + + val index = HoodieInMemoryFileIndex.create(spark, Seq(folders(0), folders(1))) + val indexedFilePaths = index.allFiles().map(fs => fs.getPath) + assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString)) + spark.stop() + } + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 9f00b5dcdf64f..39ee6e0fa7187 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -88,35 +88,6 @@ class TestHoodieSparkUtils { .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) } - @Test - def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = { - val spark = SparkSession.builder - .appName("Hoodie Datasource test") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate - - val folders: Seq[Path] = Seq( - new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri) - ) - - val files: Seq[Path] = Seq( - new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri) - ) - - folders.foreach(folder => new File(folder.toUri).mkdir()) - files.foreach(file => new File(file.toUri).createNewFile()) - - val index = HoodieSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1))) - val indexedFilePaths = index.allFiles().map(fs => fs.getPath) - assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString)) - spark.stop() - } - @Test def testCreateRddSchemaEvol(): Unit = { val spark = SparkSession.builder diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index a963081749455..ca5f79191a729 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -112,9 +112,9 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with val fullColumnsReadStats: Array[(String, Long)] = if (HoodieSparkUtils.isSpark3) Array( - ("rider", 14665), - ("rider,driver", 14665), - ("rider,driver,tip_history", 14665)) + ("rider", 14166), + ("rider,driver", 14166), + ("rider,driver,tip_history", 14166)) else if (HoodieSparkUtils.isSpark2) // TODO re-enable tests (these tests are very unstable currently) Array( @@ -163,11 +163,29 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with else fail("Only Spark 3 and Spark 2 are currently supported") + // Stats for the reads fetching _all_ columns (currently for MOR to be able to merge + // records properly full row has to be fetched; note, how amount of bytes read + // is invariant of the # of columns) + val fullColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 14166), + ("rider,driver", 14166), + ("rider,driver,tip_history", 14166)) + else if (HoodieSparkUtils.isSpark2) + // TODO re-enable tests (these tests are very unstable currently) + Array( + ("rider", -1), + ("rider,driver", -1), + ("rider,driver,tip_history", -1)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + // Test MOR / Snapshot / Skip-merge runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) // Test MOR / Snapshot / Payload-combine - runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats) + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats) // Test MOR / Read Optimized runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats) @@ -209,9 +227,9 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with val fullColumnsReadStats: Array[(String, Long)] = if (HoodieSparkUtils.isSpark3) Array( - ("rider", 19683), - ("rider,driver", 19683), - ("rider,driver,tip_history", 19683)) + ("rider", 19684), + ("rider,driver", 19684), + ("rider,driver,tip_history", 19684)) else if (HoodieSparkUtils.isSpark2) // TODO re-enable tests (these tests are very unstable currently) Array(