diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java index 6982fdbbdf112..cce024281471d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java @@ -65,6 +65,10 @@ public static Option getTablePath(FileSystem fs, Path path) throws HoodieE return getTablePathFromPartitionPath(fs, directory); } + public static Boolean isHoodieMetaPath(String path) { + return isInsideTableMetadataFolder(path) || isTableMetadataFolder(path); + } + private static boolean isTableMetadataFolder(String path) { return path != null && path.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 8017bc3d74860..fe1d91431d757 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -138,7 +138,7 @@ public class HoodieTestDataGenerator { //Maintains all the existing keys schema wise private final Map> existingKeysBySchema; - private final String[] partitionPaths; + private String[] partitionPaths; //maintains the count of existing keys schema wise private Map numKeysBySchema; @@ -805,6 +805,10 @@ public List generateGenericRecords(int numRecords) { return list; } + public void setPartitionPaths(String[] partitionPaths) { + this.partitionPaths = partitionPaths; + } + public String[] getPartitionPaths() { return partitionPaths; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index d94018b88546f..dada06ad86ea6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; @@ -148,8 +149,7 @@ public boolean accept(Path path) { // Skip all files that are descendants of .hoodie in its path. String filePath = path.toString(); - if (filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/") - || filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME)) { + if (TablePathUtils.isHoodieMetaPath(filePath)) { if (LOG.isDebugEnabled()) { LOG.debug(String.format("Skipping Hoodie Metadata file %s \n", filePath)); } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 3299b8f2597bf..83844efda5fa6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -96,7 +96,7 @@ class DefaultSource extends RelationProvider " Falling back to Read Optimized query.") new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams) } else { - new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient) + new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)(sqlContext.sparkSession) } } else { getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index bd55930d1a41d..529101b4f0412 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -23,11 +23,15 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.util.TablePathUtils import org.apache.spark.SPARK_VERSION import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.execution.datasources.InMemoryFileIndex.shouldFilterOut import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -77,11 +81,13 @@ object HoodieSparkUtils { * @return list of absolute file paths */ def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = { + val globPaths = paths.flatMap(path => { val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory) val globPaths = globPathIfNecessary(fs, qualified) globPaths }) + globPaths.filterNot( path => TablePathUtils.isHoodieMetaPath(path.toString) || shouldFilterOut(path.getName)) } def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = { @@ -89,6 +95,11 @@ object HoodieSparkUtils { new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) } + def createInMemoryFileIndex(sparkSession: SparkSession, userSpecifiedSchema: Option[StructType], parameters: Map[String, String], globbedPaths: Seq[Path]): InMemoryFileIndex = { + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex(sparkSession, globbedPaths, parameters, userSpecifiedSchema, fileStatusCache) + } + def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) createRdd(df, avroSchema, structName, recordNamespace) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 50e2ec5f30f78..86a9864b82497 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -18,24 +18,29 @@ package org.apache.hudi +import com.google.common.annotations.VisibleForTesting import org.apache.hudi.common.model.HoodieBaseFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.JobConf import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.execution.hudi.utils.PushDownUtils +import org.apache.spark.sql.{Row, SQLContext, SparkSession} +import org.apache.spark.sql.sources.{BaseRelation, CatalystScan, Filter, PrunedFilteredScan} +import org.apache.spark.sql.types.{StructField, StructType} +import java.util.Locale import scala.collection.JavaConverters._ +import scala.collection.mutable case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], logPaths: Option[List[String]], @@ -55,8 +60,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val optParams: Map[String, String], val userSchema: StructType, val globPaths: Seq[Path], - val metaClient: HoodieTableMetaClient) - extends BaseRelation with PrunedFilteredScan with Logging { + val metaClient: HoodieTableMetaClient)(val sparkSession: SparkSession) + extends BaseRelation with CatalystScan with Logging { private val conf = sqlContext.sparkContext.hadoopConfiguration private val jobConf = new JobConf(conf) @@ -68,7 +73,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) - private val fileIndex = buildFileIndex() + private val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, Some(tableStructSchema), optParams, globPaths) + private var fileIndex: List[HoodieMergeOnReadFileSplit] = _ private val preCombineField = { val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField if (preCombineFieldFromTableConfig != null) { @@ -79,16 +85,33 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD) } } - override def schema: StructType = tableStructSchema + + val partitionStructSchema = inMemoryFileIndex.partitionSpec().partitionColumns + val overlappedPartCols = mutable.Map.empty[String, StructField] + partitionStructSchema.foreach { partitionField => + if (tableStructSchema.exists(getColName(_) == getColName(partitionField))) { + overlappedPartCols += getColName(partitionField) -> partitionField + } + } + + // When data and partition schemas have overlapping columns, the output + // schema respects the order of the data schema for the overlapping columns, and it + // respects the data types of the partition schema. + override def schema: StructType = { + StructType(tableStructSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++ + partitionStructSchema.filterNot(f => overlappedPartCols.contains(getColName(f)))) + } override def needConversion: Boolean = false - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = { + fileIndex = buildFileIndex(filters) + val pushedFilters = PushDownUtils.transformFilter(filters) log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") - log.debug(s" buildScan filters = ${filters.mkString(",")}") + log.debug(s" buildScan filters = ${pushedFilters.mkString(",")}") var requiredStructSchema = StructType(Seq()) requiredColumns.foreach(col => { - val field = tableStructSchema.find(_.name == col) + val field = tableStructSchema.find(_.name == col.name) if (field.isDefined) { requiredStructSchema = requiredStructSchema.add(field.get) } @@ -106,18 +129,18 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, - partitionSchema = StructType(Nil), + partitionSchema = partitionStructSchema, requiredSchema = tableStructSchema, - filters = filters, + filters = pushedFilters, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, - partitionSchema = StructType(Nil), + partitionSchema = partitionStructSchema, requiredSchema = requiredStructSchema, - filters = filters, + filters = pushedFilters, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) @@ -132,9 +155,14 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, rdd.asInstanceOf[RDD[Row]] } - def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths) - val fileStatuses = inMemoryFileIndex.allFiles() + def buildFileIndex(filters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { + val selectedPartitions = inMemoryFileIndex.listFiles(filters, filters) + val selectedPartitionsPathMap = selectedPartitions.flatMap(x=>{ + val files = x.files + val fileMap = files.map(file=>{(file.getPath.getName,x.values)}) + fileMap + }).toMap + val fileStatuses = selectedPartitions.flatMap(_.files) if (fileStatuses.isEmpty) { throw new HoodieException("No files found for reading in user provided path.") } @@ -148,10 +176,22 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val fileSplits = fileGroup.map(kv => { val baseFile = kv._1 val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) - val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen) + val partitionValues = selectedPartitionsPathMap.get(baseFile.getFileName).get + val partitionedFile = PartitionedFile(partitionValues, baseFile.getPath, 0, baseFile.getFileLen) HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList fileSplits } + + private def getColName(f: StructField): String = { + if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { + f.name + } else { + f.name.toLowerCase(Locale.ROOT) + } + } + + @VisibleForTesting + def getFileIndexPaths = fileIndex.map(x => x.dataFile.get.filePath) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/execution/hudi/utils/PushDownUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/execution/hudi/utils/PushDownUtils.scala new file mode 100644 index 0000000000000..7a6e541b62921 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/execution/hudi/utils/PushDownUtils.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.hudi.utils + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.datasources.DataSourceStrategy.translateFilter +import org.apache.spark.sql.sources.{BaseRelation, Filter} + +/** + * This util object is use DataSourceStrategy protected translateFilter method + * [https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L439] + */ +object PushDownUtils { + + /** + * Tries to translate a Catalyst Seq[Expression] into data source Array[Filter]. + */ + def transformFilter(filterPredicates: Seq[Expression]): Array[Filter] = { + val translatedMap: Map[Expression, Filter] = filterPredicates.flatMap { p => + translateFilter(p).map(f => p -> f) + }.toMap + translatedMap.values.toArray + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 92024a3c0205c..8d3acb363c52d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,17 +17,24 @@ package org.apache.hudi.functional +import java.text.MessageFormat + +import org.apache.avro.generic.GenericRecord import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.DefaultHoodieRecordPayload import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, MergeOnReadSnapshotRelation} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.keygen.NonpartitionedKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.log4j.LogManager import org.apache.spark.sql._ +import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.functions._ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -504,6 +511,146 @@ class TestMORDataSource extends HoodieClientTestBase { hudiSnapshotDF2.show(1) } + @Test + def testPrunePartitionsWithDataSchemaIncludePartitionsSchema() { + // First Operation: + // Producing parquet files to three hive style partitions like /partition=20150316/. + // SNAPSHOT view on MOR table with parquet files only. + dataGen.setPartitionPaths(Array("20150316","20150317","20160315")); + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true") + .mode(SaveMode.Overwrite) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option("basePath", basePath) + .load(basePath + "/*/*") + + hudiSnapshotDF1.createOrReplaceTempView("mor_test_partition_table"); + val sql = "select * from mor_test_partition_table where partition > '20150415'" + + val plan = spark.sql(sql).queryExecution.sparkPlan + val mergeOnReadSnapshotRelation = plan.collect { + case scan: DataSourceScanExec => scan.relation + }.head.asInstanceOf[MergeOnReadSnapshotRelation] + + val fileIndexPaths = mergeOnReadSnapshotRelation.getFileIndexPaths; + + // path + // └── to + // └── table + // ├── partition=20150316 + // └── data.parquet + // ├── partition=20150317 + // └── data.parquet + // ├── partition=20160315 + // └── data.parquet + + // there will only remain ../partition=20160315/data.parquet when filter partition > '20150415' + assertEquals(1,fileIndexPaths.length) + assertTrue(fileIndexPaths.get(0).contains("20160315")) + } + + @Test + def testPrunePartitionsWithAppendedPartitionsSchema() { + spark.conf.set("spark.sql.parquet.enableVectorizedReader", true) + assertTrue(spark.conf.get("spark.sql.parquet.enableVectorizedReader").toBoolean) + // First Operation: + // Producing parquet files to three hive style partitions like year=2015/month=03/day=16 from split partition column value (2015/03/16). + // SNAPSHOT view on MOR table with parquet files only. + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[InferPartitionsKeyGeneratorForTesting].getName) + .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true") + .mode(SaveMode.Overwrite) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option("basePath", basePath) + .option("mergeSchema", "true") + .load(basePath + "/*/*/*/*") + + hudiSnapshotDF1.createOrReplaceTempView("mor_test_partition_table"); + val sql = "select tip_history,year,month,day from mor_test_partition_table where year < '2016' and month > '01'" + + val plan = spark.sql(sql).queryExecution.sparkPlan + val mergeOnReadSnapshotRelation = plan.collect { + case scan: DataSourceScanExec => scan.relation + }.head.asInstanceOf[MergeOnReadSnapshotRelation] + + val fileIndexPaths = mergeOnReadSnapshotRelation.getFileIndexPaths; + + // we have three partition values: "2015/03/16","2015/03/17","2016/03/15", path view as follow: + // path + // └── to + // └── table + // ├── year=2015 + // │ ├── month=03 + // ├── day=16 + // │ │ └── data.parquet + // ├── day=17 + // │ │ └── data.parquet + // ├── year=2016 + // │ ├── month=03 + // ├── day=15 + // │ │ └── data.parquet + + // there will remain year=2015/month=03/day=16/data.parquet and ../day=17/data.parquet when filter year < '2016' and month > '01'" + assertEquals(2,fileIndexPaths.length) + assertTrue(fileIndexPaths.get(0).contains("year=2015/month=03")) + assertTrue(fileIndexPaths.get(1).contains("year=2015/month=03")) + } + + @Test + def testPrunePartitionsWithMisplacedReturnData() { + spark.conf.set("spark.sql.parquet.enableVectorizedReader", true) + assertTrue(spark.conf.get("spark.sql.parquet.enableVectorizedReader").toBoolean) + // First Operation: + // Producing parquet files to three hive style partitions like year=2015/month=03/day=16 from split partition column value (2015/03/16). + // SNAPSHOT view on MOR table with parquet files only. + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[InferPartitionsKeyGeneratorForTesting].getName) + .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true") + .mode(SaveMode.Overwrite) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option("basePath", basePath) + .option("mergeSchema", "true") + .load(basePath + "/*/*/*/*") + + hudiSnapshotDF1.createOrReplaceTempView("mor_test_partition_table"); + //hudiSnapshotDF1.printSchema() + + // return correct data + val dataSchema_partitionSchema_sql = "select tip_history,year,month,day from mor_test_partition_table where year < '2016' and month > '01'" + spark.sql(dataSchema_partitionSchema_sql).show(1) + + // return misplaced data + val partitionSchema_dataSchema_sql = "select year,tip_history,month,day from mor_test_partition_table where year < '2016' and month > '01'" + spark.sql(partitionSchema_dataSchema_sql).show(1) + } + @Test def testPreCombineFiledForReadMOR(): Unit = { writeData((1, "a0",10, 100)) @@ -563,3 +710,22 @@ class TestMORDataSource extends HoodieClientTestBase { df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history").show(1) } } + +/** + * This Key Generator class is use for data schema is not contain partition schema. + */ +private class InferPartitionsKeyGeneratorForTesting (val props: TypedProperties, val recordKeyField: String) + extends SimpleKeyGenerator(props) { + + private var partitionPathTemplate = "year={0}/month={1}/day={2}" + + def this(props: TypedProperties) { + this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)) + } + + override def getPartitionPath(record: GenericRecord): String = { + val dateVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathFields.get(0), true) + val dateArr = dateVal.split("/") + MessageFormat.format(partitionPathTemplate, dateArr(0), dateArr(1), dateArr(2)) + } +}