From 14d739908bd0ab3f2e23b22fa63520132d66c39e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 17:07:53 -0700 Subject: [PATCH 01/10] Flip the config by default to fallback previous behavior of appending partition column values parsed from partition path --- .../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ac4d0e5794392..0a6494c22da5b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -131,7 +131,7 @@ object DataSourceReadOptions { val EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH: ConfigProperty[Boolean] = ConfigProperty.key("hoodie.datasource.read.extract.partition.values.from.path") - .defaultValue(false) + .defaultValue(true) .sinceVersion("0.11.0") .withDocumentation("When set to true, values for partition columns (partition values) will be extracted" + " from physical partition path (default Spark behavior). When set to false partition values will be" + From 9c5b1b3e02474e4da6d9cd610378d6f7055034f7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 17:09:49 -0700 Subject: [PATCH 02/10] Fixed conditional incorrectly skipping parsing partition values --- .../main/scala/org/apache/hudi/HoodieBaseRelation.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 4b7177f4d6326..2094407ab90d9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -399,7 +399,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = { try { val tableConfig = metaClient.getTableConfig - if (shouldExtractPartitionValuesFromPartitionPath) { + if (partitionColumns.isEmpty) { + InternalRow.empty + } else { val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean if (hiveStylePartitioningEnabled) { @@ -414,12 +416,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, InternalRow.fromSeq(parts.map(UTF8String.fromString)) } } - } else { - InternalRow.empty } } catch { case NonFatal(e) => - logWarning(s"Failed to get the right partition InternalRow for file : ${file.toString}") + logWarning(s"Failed to get the right partition InternalRow for file: ${file.toString}", e) InternalRow.empty } } From ebff8313ef61b7e7f5b8c94f3b84347385d3f8e0 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 18:02:57 -0700 Subject: [PATCH 03/10] Removing partition filters conversion for `TimestampBasedKeyGenerator`s; Reverting tests --- .../org/apache/hudi/HoodieFileIndex.scala | 5 +---- .../hudi/MergeOnReadSnapshotRelation.scala | 7 ++----- .../hudi/functional/TestCOWDataSource.scala | 13 ++++++++---- .../hudi/functional/TestMORDataSource.scala | 21 +++++++++++++------ 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 08d0d722b2f68..d73e3a5d3b934 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -108,9 +108,6 @@ case class HoodieFileIndex(spark: SparkSession, * @return list of PartitionDirectory containing partition to base files mapping */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - val convertedPartitionFilters = - HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) - // Look up candidate files names in the col-stats index, if all of the following conditions are true // - Data-skipping is enabled // - Col-Stats Index is present @@ -144,7 +141,7 @@ case class HoodieFileIndex(spark: SparkSession, Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) } else { // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters) + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters) var totalFileSize = 0 var candidateFileSize = 0 diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 75bc96624e7b0..bd5d0a12743bf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -97,15 +97,12 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { - val convertedPartitionFilters = - HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) - if (globPaths.isEmpty) { - val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters) + val fileSlices = fileIndex.listFileSlices(partitionFilters) buildSplits(fileSlices.values.flatten.toSeq) } else { // TODO refactor to avoid iterating over listed files multiple times - val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters) + val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) val partitionPaths = partitions.keys.toSeq if (partitionPaths.isEmpty || latestInstant.isEmpty) { // If this an empty table OR it has no completed commits yet, return diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 088ec1faabf73..086033de712e1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -144,7 +144,7 @@ class TestCOWDataSource extends HoodieClientTestBase { def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = { val options = commonOpts ++ Map( "hoodie.compact.inline" -> "false", - DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator", Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING", Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd", @@ -176,8 +176,11 @@ class TestCOWDataSource extends HoodieClientTestBase { // snapshot query val snapshotQueryRes = spark.read.format("hudi").load(basePath) - assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20) - assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30) + // TODO(HUDI-3204) this had to be reverted to existing behavior + //assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20) + //assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30) + assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20) + assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30) // incremental query val incrementalQueryRes = spark.read.format("hudi") @@ -962,7 +965,9 @@ class TestCOWDataSource extends HoodieClientTestBase { // data_date is the partition field. Persist to the parquet file using the origin values, and read it. assertEquals( - Seq("2018-09-23", "2018-09-24"), + // TODO(HUDI-3204) this had to be reverted to existing behavior + //Seq("2018-09-23", "2018-09-24"), + Seq("2018/09/23", "2018/09/24"), firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq ) assertEquals( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 96514603efdcd..8b85d225a59da 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -857,15 +857,21 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit2Time'").count, 40) assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit3Time'").count, 20) - assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 50) - assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 60) + // TODO(HUDI-3204) this had to be reverted to existing behavior + //assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 50) + //assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 60) + assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 50) + assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 60) // read_optimized query val readOptimizedQueryRes = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) .load(basePath) - assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50) - assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60) + // TODO(HUDI-3204) this had to be reverted to existing behavior + //assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50) + //assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60) + assertEquals(readOptimizedQueryRes.where("partition = '2022/01/01'").count, 50) + assertEquals(readOptimizedQueryRes.where("partition = '2022/01/02'").count, 60) // incremental query val incrementalQueryRes = spark.read.format("hudi") @@ -873,7 +879,10 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit2Time) .option(DataSourceReadOptions.END_INSTANTTIME.key, commit3Time) .load(basePath) - assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0) - assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20) + // TODO(HUDI-3204) this had to be reverted to existing behavior + //assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0) + //assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20) + assertEquals(incrementalQueryRes.where("partition = '2022/01/01'").count, 0) + assertEquals(incrementalQueryRes.where("partition = '2022/01/02'").count, 20) } } From 9d076a9d16efa4a6b0fb8685ae535c9af8c0de83 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 19:32:26 -0700 Subject: [PATCH 04/10] Reset config to "true" by default, instead enabled partition values extraction from partition path only for COW; --- .../src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala | 2 ++ .../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index c57f46a7b6639..92c69d87aa131 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -54,6 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, override type FileSplit = HoodieBaseFileSplit + override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = true + override lazy val mandatoryFields: Seq[String] = // TODO reconcile, record's key shouldn't be mandatory for base-file only relation Seq(recordKeyField) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 0a6494c22da5b..ac4d0e5794392 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -131,7 +131,7 @@ object DataSourceReadOptions { val EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH: ConfigProperty[Boolean] = ConfigProperty.key("hoodie.datasource.read.extract.partition.values.from.path") - .defaultValue(true) + .defaultValue(false) .sinceVersion("0.11.0") .withDocumentation("When set to true, values for partition columns (partition values) will be extracted" + " from physical partition path (default Spark behavior). When set to false partition values will be" + From b85426ec657da6e4a4664f1ad3a95f85be982432 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 19:41:09 -0700 Subject: [PATCH 05/10] Reverting changes --- .../hudi/MergeOnReadSnapshotRelation.scala | 7 +++++-- .../hudi/functional/TestMORDataSource.scala | 21 ++++++------------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index bd5d0a12743bf..75bc96624e7b0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -97,12 +97,15 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { + val convertedPartitionFilters = + HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) + if (globPaths.isEmpty) { - val fileSlices = fileIndex.listFileSlices(partitionFilters) + val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters) buildSplits(fileSlices.values.flatten.toSeq) } else { // TODO refactor to avoid iterating over listed files multiple times - val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) + val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters) val partitionPaths = partitions.keys.toSeq if (partitionPaths.isEmpty || latestInstant.isEmpty) { // If this an empty table OR it has no completed commits yet, return diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 8b85d225a59da..518d7fefddfec 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,11 +17,10 @@ package org.apache.hudi.functional -import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings @@ -30,9 +29,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, SparkDatasetMixin} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin} import org.apache.log4j.LogManager -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.BooleanType @@ -41,7 +39,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource -import java.util import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -857,11 +854,8 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit2Time'").count, 40) assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit3Time'").count, 20) - // TODO(HUDI-3204) this had to be reverted to existing behavior - //assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 50) - //assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 60) - assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 50) - assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 60) + assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 50) + assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 60) // read_optimized query val readOptimizedQueryRes = spark.read.format("hudi") @@ -879,10 +873,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit2Time) .option(DataSourceReadOptions.END_INSTANTTIME.key, commit3Time) .load(basePath) - // TODO(HUDI-3204) this had to be reverted to existing behavior - //assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0) - //assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20) - assertEquals(incrementalQueryRes.where("partition = '2022/01/01'").count, 0) - assertEquals(incrementalQueryRes.where("partition = '2022/01/02'").count, 20) + assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0) + assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20) } } From 55796feecde7c872f699a6474ad081726500764d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 19:45:15 -0700 Subject: [PATCH 06/10] Added notes --- .../src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 92c69d87aa131..d4c13db5d15dc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -54,6 +54,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, override type FileSplit = HoodieBaseFileSplit + // TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract + // partition values from partition path + // For more details please check HUDI-4161 override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = true override lazy val mandatoryFields: Seq[String] = From a500ed845bef16b4329ec23be89be32a43fdf60f Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 20:03:19 -0700 Subject: [PATCH 07/10] Fixing tests --- .../hudi/functional/TestCOWDataSource.scala | 16 +++++++++------- .../hudi/functional/TestMORDataSource.scala | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 086033de712e1..7c86da0c9e362 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -176,7 +176,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // snapshot query val snapshotQueryRes = spark.read.format("hudi").load(basePath) - // TODO(HUDI-3204) this had to be reverted to existing behavior + // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10 //assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20) //assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30) assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20) @@ -964,12 +964,14 @@ class TestCOWDataSource extends HoodieClientTestBase { assert(firstDF.count() == 2) // data_date is the partition field. Persist to the parquet file using the origin values, and read it. - assertEquals( - // TODO(HUDI-3204) this had to be reverted to existing behavior - //Seq("2018-09-23", "2018-09-24"), - Seq("2018/09/23", "2018/09/24"), - firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq - ) + // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10 + val expectedValues = if (useGlobbing) { + Seq("2018-09-23", "2018-09-24") + } else { + Seq("2018/09/23", "2018/09/24") + } + + assertEquals(expectedValues, firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq) assertEquals( Seq("2018/09/23", "2018/09/24"), firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 518d7fefddfec..f9f14438933f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -861,7 +861,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { val readOptimizedQueryRes = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) .load(basePath) - // TODO(HUDI-3204) this had to be reverted to existing behavior + // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10 //assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50) //assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60) assertEquals(readOptimizedQueryRes.where("partition = '2022/01/01'").count, 50) From 10ce9e596c7180f8b7bd58df078058f6db4762af Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 20:29:46 -0700 Subject: [PATCH 08/10] Reverting changes --- .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 2094407ab90d9..b5c8453023dd7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -399,9 +399,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = { try { val tableConfig = metaClient.getTableConfig - if (partitionColumns.isEmpty) { - InternalRow.empty - } else { + if (shouldExtractPartitionValuesFromPartitionPath) { val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean if (hiveStylePartitioningEnabled) { @@ -416,6 +414,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, InternalRow.fromSeq(parts.map(UTF8String.fromString)) } } + } else { + InternalRow.empty } } catch { case NonFatal(e) => From fa5feff4aa02ca21bf23558322e92350fd0137fa Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 21:49:01 -0700 Subject: [PATCH 09/10] Make sure partition values are not extracted when Schema Evolution is enabled --- .../main/scala/org/apache/hudi/BaseFileOnlyRelation.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index d4c13db5d15dc..4160c34b0ce64 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -57,7 +57,12 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, // TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract // partition values from partition path // For more details please check HUDI-4161 - override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = true + // NOTE: This override has to mirror semantic of whenever this Relation is converted into [[HadoopFsRelation]], + // which is currently done for all cases, except when Schema Evolution is enabled + override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = { + val enableSchemaOnRead = !internalSchema.isEmptySchema + !enableSchemaOnRead + } override lazy val mandatoryFields: Seq[String] = // TODO reconcile, record's key shouldn't be mandatory for base-file only relation From 49c0ebb6f3a8a40eb814266f4b3abf8ffbd6b488 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 26 May 2022 21:53:09 -0700 Subject: [PATCH 10/10] Typo --- .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index b5c8453023dd7..08f87816d7c35 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -171,7 +171,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = { // Controls whether partition columns (which are the source for the partition path values) should // be omitted from persistence in the data files. On the read path it affects whether partition values (values - // of partition columns) will be read from the data file ot extracted from partition path + // of partition columns) will be read from the data file or extracted from partition path val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty val shouldExtractPartitionValueFromPath = optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,